Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 20 additions & 31 deletions apps/common/handle/impl/table/xlsx_parse_table_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,24 @@ def support(self, file, get_buffer):

def fill_merged_cells(self, sheet, image_dict):
data = []

# 获取第一行作为标题行
headers = []
for idx, cell in enumerate(sheet[1]):
if cell.value is None:
headers.append(' ' * (idx + 1))
else:
headers.append(cell.value)

# 从第二行开始遍历每一行
for row in sheet.iter_rows(min_row=2, values_only=False):
row_data = {}
for row in sheet.iter_rows(values_only=False):
row_data = []
for col_idx, cell in enumerate(row):
cell_value = cell.value

# 如果单元格为空,并且该单元格在合并单元格内,获取合并单元格的值
if cell_value is None:
for merged_range in sheet.merged_cells.ranges:
if cell.coordinate in merged_range:
cell_value = sheet[merged_range.min_row][merged_range.min_col - 1].value
break

image = image_dict.get(cell_value, None)
if image is not None:
cell_value = f'![](/api/image/{image.id})'

# 使用标题作为键,单元格的值作为值存入字典
row_data[headers[col_idx]] = cell_value
row_data.insert(col_idx, cell_value)
data.append(row_data)

for merged_range in sheet.merged_cells.ranges:
cell_value = data[merged_range.min_row - 1][merged_range.min_col - 1]
for row_index in range(merged_range.min_row, merged_range.max_row + 1):
for col_index in range(merged_range.min_col, merged_range.max_col + 1):
data[row_index - 1][col_index - 1] = cell_value
return data

def handle(self, file, get_buffer, save_image):
Expand All @@ -65,11 +53,13 @@ def handle(self, file, get_buffer, save_image):
paragraphs = []
ws = wb[sheetname]
data = self.fill_merged_cells(ws, image_dict)

for row in data:
row_output = "; ".join([f"{key}: {value}" for key, value in row.items()])
# print(row_output)
paragraphs.append({'title': '', 'content': row_output})
if len(data) >= 2:
head_list = data[0]
for row_index in range(1, len(data)):
row_output = "; ".join(
[f"{head_list[col_index]}: {data[row_index][col_index]}" for col_index in
range(0, len(data[row_index]))])
paragraphs.append({'title': '', 'content': row_output})

result.append({'name': sheetname, 'paragraphs': paragraphs})

Expand All @@ -78,7 +68,6 @@ def handle(self, file, get_buffer, save_image):
return [{'name': file.name, 'paragraphs': []}]
return result


def get_content(self, file, save_image):
try:
# 加载 Excel 文件
Expand All @@ -94,18 +83,18 @@ def get_content(self, file, save_image):
# 如果未指定 sheet_name,则使用第一个工作表
for sheetname in workbook.sheetnames:
sheet = workbook[sheetname] if sheetname else workbook.active
rows = self.fill_merged_cells(sheet, image_dict)
if len(rows) == 0:
data = self.fill_merged_cells(sheet, image_dict)
if len(data) == 0:
continue
# 提取表头和内容

headers = [f"{key}" for key, value in rows[0].items()]
headers = [f"{value}" for value in data[0]]

# 构建 Markdown 表格
md_table = '| ' + ' | '.join(headers) + ' |\n'
md_table += '| ' + ' | '.join(['---'] * len(headers)) + ' |\n'
for row in rows:
r = [f'{value}' for key, value in row.items()]
for row_index in range(1, len(data)):
r = [f'{value}' for value in data[row_index]]
md_table += '| ' + ' | '.join(
[str(cell).replace('\n', '<br>') if cell is not None else '' for cell in r]) + ' |\n'

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several areas where refactoring can be beneficial to improve readability, maintainability, and efficiency of the provided code:

  1. Avoid Repeated for loop Headers:
    The first couple of loops have similar header structures. Consider using a function or class method instead.

  2. Optimize Merged Cells Handling:
    The merging process can be simplified by directly assigning cells based on their coordinates rather than iterating through all rows and columns repeatedly.

  3. Refactor Data Processing Logic:
    Separate concerns between loading sheets and processing content into distinct methods for better clarity and modularity.

Here's a revised version with these improvements:

class ExcelHandler:
    def __init__(self):
        pass

    def _get_buffer(self, file):
        """Return buffer bytes from provided file."""
        ...

    def support(self, file, get_buffer):
        """Check support for specified file type here."""
        ...

    def _load_workbook(self, data_bytes):
        """Load workbook from byte data."""
        ...

    def _find_sheet_name(self, sheet_names, default=None):
        """Find desired sheet name in list; otherwise use default name."""
        ...

    def fill_merged_cells(self, sheet, image_dict):
        """
        Fill merged cells in worksheet and map cell keys to dictionary.
        Returns a list of dictionaries where each dict represents a row containing cell titles mapped to text/images.
        :param sheet:
        :param image_dict:
        :return:
        """
        data = []

        # Collect headers from non-empty topmost row
        headers = next((cell.value for row in sheet[:1] for cell in row if cell), [])

        # Process content starting from the second row
        for row in sheet.iter_rows(min_row=2):
            col_idx_to_key = {}
            row_data = {}

            for col_idx, (cell, header) in enumerate(zip(row, headers)):
                if cell.value is None:
                    # If empty cell within spanned area get corresponding header value
                    header_span_start = sheet._current_rowspan.get(cell.row, 0)
                    header_value = headers[header_span_start - 1]
                    col_idx_to_key[col_idx] = header_value
                    # Handle images similarly inside spanning regions
                else:
                    col_idx_to_key[col_idx] = cell.value

                formatted_cell_value = self._format_cell_value(cell.value, image_dict)
                row_data[col_idx_to_key[col_idx]] = formatted_cell_value

            data.append(row_data)

        # Assign final values across spans
        for row_data in data:
            for i in range(len(data[data.index(row_data)])):
                val = row_data[i]
                if isinstance(val, str) and ',' in val:  # Assuming comma delimiter in multi-value string
                    parts = val.split(',')
                    new_val_dict = {part.strip(): '' for part in parts}
                    del row_data[i]
                    row_data.extend(new_val_dict.items())

        return data

    def _format_cell_value(self, cell_value, image_dict):
        if cell_value is not None:
            image = image_dict.get(cell_value, None)
            if image is not None:
                cell_value = f'![](/api/image/{image.id})'
        return cell_value

    def handle(self, file, get_buffer, save_image):
        """
        Handles handling logic including getting sheet names, processing worksheets, etc.
        :param file:
        :param get_buffer:
        :param save_image:
        :return:
        """
        data_bytes = get_buffer(file)
        wb = self._load_workbook(data_bytes)
        result = []

        sheet_name = self._find_sheet_name(wb.sheetnames)

        while True:
            sheet = wb[sheet_name]

            contents = self.fill_merged_cells(sheet, {})

            paragraphs = []
            for row in contents:
                paragraph_content = " ; ".join(["{} : {}".format(key, str(value).strip()) for key, value in row.items()]).rstrip()
                paragraphs.append({"title": "", "content": paragraph_content})

            result.append({"name": sheet_name, "paragraphs": paragraphs})
            sheet_name = next(iter(set(wb.sheetnames) - set(result[-1]["name"]), False))

        return result

    def get_content(self, file, save_image):
        """
        Main processing handler responsible for fetching and parsing content out of an Excel document/file object,
        then returning it in markdown format.
        This will involve creating the necessary helper functions that read/load different excel files & return appropriate output formats (JSON, YAML, CSV, etc.)
        :param file:
        :param save_image:
        :return:
        """
        data_bytes = self._get_buffer(file)
        wb = self._load_workbook(data_bytes)

        result = []
        for name in [sheet.title for sheet in wb.worksheets]:
            sheet = wb[name]

            content = self.handle(sheet.name, self._get_buffer(sheet.read()), save_image)
            result.append(content)

        return {'data': [{'type': 'excel', 'payload': result}],
               'response_type': 'application/json'}

# Example usage:
# handler_instance = ExcelHandler()
# response = handler_instance.get_content("some_excel_file.xlsx", lambda x: x)

Key Changes Made:

  • Extracted Helper Methods: Split off common operations like finding the default sheet name and formatting cell values into separate methods (_find_sheet_name and _format_cell_value, respectively).
  • Merging Improvement: Reduced redundant merging checks by storing rowspan information and directly applying changes during row iteration.
  • Data Structure Optimization: Simplified and structured data collection from merged cells, ensuring consistent handling across various scenarios.

These changes make the code more modular, easier to understand, and less prone to errors as complex operations become isolated tasks.

Expand Down