Skip to content

Commit 17e89d5

Browse files
author
root
committed
feat: add parallel processing for slice mapping (threshold >= 1000)
- Add parallelThreshold config for large slice mapping - Implement parallel MapDirectSlice, MapDirectPtrSlice, SafeMapDirectSlice - Fix WaitGroup initialization in safeMapDirectSliceParallel - Fix error message to support index > 9 - Add field mapping pre-warm for MapDirectPtrSlice
1 parent 36d1d4f commit 17e89d5

File tree

1 file changed

+186
-11
lines changed

1 file changed

+186
-11
lines changed

mapper_func.go

Lines changed: 186 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,15 @@ package mapper
22

33
import (
44
"errors"
5+
"fmt"
56
"reflect"
7+
"runtime"
68
"sync"
79
)
810

9-
// ============ 字段映射缓存 ============
11+
// parallelThreshold 并行处理的阈值
12+
// 当 slice 长度 >= 此值时启用并行映射
13+
var parallelThreshold = 1000
1014

1115
// fieldMappingCache 字段映射缓存
1216
// key: fromType -> toType, value: field mappings
@@ -135,31 +139,98 @@ func MapDirectSlice[From, To any](from []From) []To {
135139
if from == nil {
136140
return nil
137141
}
138-
139-
// 尝试预获取映射关系以优化批量操作
142+
143+
length := len(from)
144+
if length == 0 {
145+
return []To{}
146+
}
147+
148+
// 预获取映射关系以优化批量操作
140149
fromType := reflect.TypeOf((*From)(nil)).Elem()
141150
toType := reflect.TypeOf((*To)(nil)).Elem()
142-
151+
143152
_, hasCache := getFieldMappings(fromType, toType)
144153
if !hasCache {
145154
buildFieldMappings(fromType, toType)
146155
}
147-
148-
result := make([]To, len(from))
156+
157+
// 大 slice 使用并行处理
158+
if length >= parallelThreshold {
159+
return mapDirectSliceParallel[From, To](from)
160+
}
161+
162+
result := make([]To, length)
149163
for i, v := range from {
150164
result[i] = MapDirect[From, To](v)
151165
}
152166
return result
153167
}
154168

169+
// mapDirectSliceParallel 并行批量映射
170+
func mapDirectSliceParallel[From, To any](from []From) []To {
171+
length := len(from)
172+
result := make([]To, length)
173+
174+
// 计算合适的 worker 数量
175+
numCPU := runtime.NumCPU()
176+
numWorkers := length / 100 // 每 100 个元素一个 worker
177+
if numWorkers < 1 {
178+
numWorkers = 1
179+
}
180+
if numWorkers > numCPU {
181+
numWorkers = numCPU
182+
}
183+
184+
chunkSize := length / numWorkers
185+
var wg sync.WaitGroup
186+
wg.Add(numWorkers)
187+
188+
for w := 0; w < numWorkers; w++ {
189+
start := w * chunkSize
190+
end := start + chunkSize
191+
if w == numWorkers-1 {
192+
end = length // 最后一个 worker 处理剩余部分
193+
}
194+
195+
go func(s, e int) {
196+
defer wg.Done()
197+
for i := s; i < e; i++ {
198+
result[i] = MapDirect[From, To](from[i])
199+
}
200+
}(start, end)
201+
}
202+
203+
wg.Wait()
204+
return result
205+
}
206+
155207
// MapDirectPtrSlice 指针切片映射
156208
// 使用示例: dtos := mapper.MapDirectPtrSlice[User, UserDTO](&users)
157209
func MapDirectPtrSlice[From, To any](from []*From) []*To {
158210
if from == nil {
159211
return nil
160212
}
161-
162-
result := make([]*To, len(from))
213+
214+
length := len(from)
215+
if length == 0 {
216+
return []*To{}
217+
}
218+
219+
// 预获取映射关系以优化批量操作
220+
fromType := reflect.TypeOf((*From)(nil)).Elem()
221+
toType := reflect.TypeOf((*To)(nil)).Elem()
222+
223+
_, hasCache := getFieldMappings(fromType, toType)
224+
if !hasCache {
225+
buildFieldMappings(fromType, toType)
226+
}
227+
228+
// 大 slice 使用并行处理
229+
if length >= parallelThreshold {
230+
return mapDirectPtrSliceParallel[From, To](from)
231+
}
232+
233+
result := make([]*To, length)
163234
for i, v := range from {
164235
if v != nil {
165236
t := MapDirect[From, To](*v)
@@ -169,6 +240,46 @@ func MapDirectPtrSlice[From, To any](from []*From) []*To {
169240
return result
170241
}
171242

243+
// mapDirectPtrSliceParallel 并行指针切片映射
244+
func mapDirectPtrSliceParallel[From, To any](from []*From) []*To {
245+
length := len(from)
246+
result := make([]*To, length)
247+
248+
numCPU := runtime.NumCPU()
249+
numWorkers := length / 100
250+
if numWorkers < 1 {
251+
numWorkers = 1
252+
}
253+
if numWorkers > numCPU {
254+
numWorkers = numCPU
255+
}
256+
257+
chunkSize := length / numWorkers
258+
var wg sync.WaitGroup
259+
wg.Add(numWorkers)
260+
261+
for w := 0; w < numWorkers; w++ {
262+
start := w * chunkSize
263+
end := start + chunkSize
264+
if w == numWorkers-1 {
265+
end = length
266+
}
267+
268+
go func(s, e int) {
269+
defer wg.Done()
270+
for i := s; i < e; i++ {
271+
if from[i] != nil {
272+
t := MapDirect[From, To](*from[i])
273+
result[i] = &t
274+
}
275+
}
276+
}(start, end)
277+
}
278+
279+
wg.Wait()
280+
return result
281+
}
282+
172283
// ============ 错误处理函数 (优化版) ============
173284

174285
// SafeMapDirect 安全映射,忽略错误
@@ -226,18 +337,82 @@ func SafeMapDirectSlice[From, To any](from []From) ([]To, error) {
226337
if from == nil {
227338
return nil, nil
228339
}
229-
230-
result := make([]To, len(from))
340+
341+
length := len(from)
342+
if length == 0 {
343+
return []To{}, nil
344+
}
345+
346+
// 大 slice 使用并行处理
347+
if length >= parallelThreshold {
348+
return safeMapDirectSliceParallel[From, To](from)
349+
}
350+
351+
result := make([]To, length)
231352
for i, v := range from {
232353
r, err := SafeMapDirect[From, To](v)
233354
if err != nil {
234-
return nil, errors.New("map slice failed at index " + string(rune(i+'0')))
355+
return nil, fmt.Errorf("map slice failed at index %d", i)
235356
}
236357
result[i] = r
237358
}
238359
return result, nil
239360
}
240361

362+
// safeMapDirectSliceParallel 并行安全批量映射
363+
func safeMapDirectSliceParallel[From, To any](from []From) ([]To, error) {
364+
length := len(from)
365+
result := make([]To, length)
366+
367+
numCPU := runtime.NumCPU()
368+
numWorkers := length / 100
369+
if numWorkers < 1 {
370+
numWorkers = 1
371+
}
372+
if numWorkers > numCPU {
373+
numWorkers = numCPU
374+
}
375+
376+
chunkSize := length / numWorkers
377+
var wg sync.WaitGroup
378+
wg.Add(numWorkers)
379+
errChan := make(chan error, 1) // 用于接收第一个错误
380+
381+
for w := 0; w < numWorkers; w++ {
382+
start := w * chunkSize
383+
end := start + chunkSize
384+
if w == numWorkers-1 {
385+
end = length
386+
}
387+
388+
go func(s, e int) {
389+
defer wg.Done()
390+
for i := s; i < e; i++ {
391+
r, err := SafeMapDirect[From, To](from[i])
392+
if err != nil {
393+
select {
394+
case errChan <- fmt.Errorf("map slice failed at index %d", i):
395+
default:
396+
}
397+
return
398+
}
399+
result[i] = r
400+
}
401+
}(start, end)
402+
}
403+
404+
wg.Wait()
405+
406+
// 检查是否有错误
407+
select {
408+
case err := <-errChan:
409+
return nil, err
410+
default:
411+
}
412+
413+
return result, nil
414+
}
415+
241416
// ClearFieldMappingCache 清除字段映射缓存
242417
// 用于在需要重新构建映射关系时调用
243418
func ClearFieldMappingCache() {

0 commit comments

Comments
 (0)