feat: Add UMO Group aggregation support and routing#153
Draft
lekoOwO wants to merge 3 commits intoSXP-Simon:mainfrom
Draft
feat: Add UMO Group aggregation support and routing#153lekoOwO wants to merge 3 commits intoSXP-Simon:mainfrom
lekoOwO wants to merge 3 commits intoSXP-Simon:mainfrom
Conversation
Co-authored-by: lekoOwO <20151124+lekoOwO@users.noreply.github.com> Co-authored-by: openai-code-agent[bot] <242516109+Codex@users.noreply.github.com>
Contributor
Reviewer's GuideImplements UMO Group aggregation support for both daily and incremental analyses, adds scheduler wiring to treat UMO groups as first‑class analysis targets with their own source/output UMO configuration and list‑filtering, updates config and reporting to understand UMO group IDs and aggregated report context, and refactors common analysis logic into reusable helpers. Sequence diagram for UMO Group daily aggregation and reportingsequenceDiagram
actor Admin
participant AutoScheduler
participant AnalysisApplicationService as AnalysisService
participant BotManager
participant Adapter
participant MessageCleanerService as Cleaner
participant LLMAnalyzer
participant HistoryManager
participant ReportDispatcher as Dispatcher
Admin->>AutoScheduler: configure UMO_groups, schedules
AutoScheduler->>AutoScheduler: _get_umo_group_targets(mode_filter=None)
AutoScheduler->>AutoScheduler: _perform_umo_group_analysis_with_timeout(group_id, sources, outputs, mode=traditional)
AutoScheduler->>AutoScheduler: _perform_umo_group_analysis(group_id, sources, outputs, mode)
AutoScheduler->>AnalysisService: execute_daily_analysis_for_sources(group_id, source_umos)
activate AnalysisService
loop for each source_umo in source_umos
AnalysisService->>ConfigManager: parse_umo_string(source_umo)
ConfigManager-->>AnalysisService: platform_id, session_id
AnalysisService->>BotManager: get_adapter(platform_id)
alt adapter exists
BotManager-->>AnalysisService: adapter
AnalysisService->>Adapter: fetch_messages(group_id=session_id, days, max_count)
Adapter-->>AnalysisService: raw_messages
else adapter missing
BotManager-->>AnalysisService: None
AnalysisService->>AnalysisService: skip this source_umo
end
end
AnalysisService->>MessageCleanerService: clean_messages(all_raw_messages, bot_self_ids, filter_commands=True)
MessageCleanerService-->>AnalysisService: unified_messages
AnalysisService->>AnalysisService: _analyze_messages(unified_messages, group_id, platform_id=first_platform, adapter=None, umo_override=group_id)
activate AnalysisService
AnalysisService->>StatisticsService: calculate_group_statistics(unified_messages)
StatisticsService-->>AnalysisService: statistics
AnalysisService->>ConfigManager: get_bot_self_ids()
ConfigManager-->>AnalysisService: bot_self_ids
AnalysisService->>AnalysisDomainService: analyze_user_activity(unified_messages, bot_self_ids)
AnalysisDomainService-->>AnalysisService: user_activity
AnalysisService->>ConfigManager: get_max_user_titles()
ConfigManager-->>AnalysisService: limit
AnalysisService->>AnalysisDomainService: get_top_users(user_activity, limit)
AnalysisDomainService-->>AnalysisService: top_users
AnalysisService->>ConfigManager: get_topic_analysis_enabled(), get_user_title_analysis_enabled(), get_golden_quote_analysis_enabled(), get_chat_quality_analysis_enabled()
ConfigManager-->>AnalysisService: flags
AnalysisService->>StatisticsService: _convert_to_legacy_dict(unified_messages)
StatisticsService-->>AnalysisService: legacy_messages
alt any LLM flag enabled
AnalysisService->>LLMAnalyzer: analyze_all_concurrent(legacy_messages, user_activity, umo=group_id, top_users, flags)
LLMAnalyzer-->>AnalysisService: topics, user_titles, golden_quotes, token_usage, chat_quality_review
else
AnalysisService->>AnalysisService: skip LLM analysis
end
AnalysisService->>HistoryManager: save_analysis(group_id, analysis_result)
HistoryManager-->>AnalysisService: saved
AnalysisService-->>AutoScheduler: {success=True, analysis_result, messages_count, adapter=None, platform_id=first_platform}
deactivate AnalysisService
deactivate AnalysisService
AutoScheduler->>ReportDispatcher: dispatch(dest.group_id, analysis_result, dest.platform_id, report_group_id=group_id) for each dest in outputs
activate Dispatcher
alt output_format=image
Dispatcher->>ReportGenerator: generate_image_report(analysis_result, report_group_id, html_render_func, avatar_url_getter)
ReportGenerator-->>Dispatcher: image_url, html_content
Dispatcher->>Adapter: send_image(group_id, image_url or file)
else output_format=pdf or html or text
Dispatcher->>ReportGenerator: generate_pdf_report or generate_html_report(with report_group_id)
ReportGenerator-->>Dispatcher: path
Dispatcher->>Adapter: send_file_or_text(group_id, path or text)
end
deactivate Dispatcher
AutoScheduler-->>Admin: UMO Group aggregated report delivered
Sequence diagram for UMO Group incremental aggregation and final reportsequenceDiagram
actor Admin
participant AutoScheduler
participant AnalysisApplicationService as AnalysisService
participant BotManager
participant IncrementalStore
participant MessageCleanerService as Cleaner
participant LLMAnalyzer
participant ReportDispatcher as Dispatcher
Admin->>AutoScheduler: configure UMO_groups, incremental lists
AutoScheduler->>AutoScheduler: _get_umo_group_targets(mode_filter=incremental)
AutoScheduler->>AutoScheduler: _perform_incremental_analysis_for_umo_group_with_timeout(group_id, source_umos)
AutoScheduler->>AutoScheduler: _perform_incremental_analysis_for_umo_group(group_id, source_umos)
AutoScheduler->>AnalysisService: execute_incremental_analysis_for_sources(group_id, source_umos)
activate AnalysisService
loop for each source_umo in source_umos
AnalysisService->>ConfigManager: parse_umo_string(source_umo)
ConfigManager-->>AnalysisService: platform_id, session_id
AnalysisService->>BotManager: get_adapter(platform_id)
BotManager-->>AnalysisService: adapter or None
alt adapter exists
AnalysisService->>AnalysisService: _build_source_progress_key(group_id, source_umo)
AnalysisService->>IncrementalStore: get_last_analyzed_timestamp(progress_key)
IncrementalStore-->>AnalysisService: last_ts
AnalysisService->>Adapter: fetch_messages(group_id=session_id, days, max_count, since_ts=last_ts)
Adapter-->>AnalysisService: raw_messages
AnalysisService->>MessageCleanerService: clean_messages(raw_messages, bot_self_ids, filter_commands=True)
MessageCleanerService-->>AnalysisService: unified_messages
AnalysisService->>AnalysisService: filter messages with timestamp > last_ts
AnalysisService->>AnalysisService: append to combined_messages, update source_watermarks
else
AnalysisService->>AnalysisService: skip this source_umo
end
end
AnalysisService->>StatisticsService: calculate_group_statistics(combined_messages)
StatisticsService-->>AnalysisService: statistics
AnalysisService->>AnalysisDomainService: analyze_user_activity(combined_messages, bot_self_ids)
AnalysisDomainService-->>AnalysisService: user_activity
AnalysisService->>AnalysisService: _compute_hourly_counts(combined_messages)
AnalysisService-->>AnalysisService: hourly_msg_counts, hourly_char_counts
AnalysisService->>ConfigManager: get_incremental_topics_per_batch(), get_incremental_quotes_per_batch(), LLM flags
ConfigManager-->>AnalysisService: topics_per_batch, quotes_per_batch, flags
AnalysisService->>StatisticsService: _convert_to_legacy_dict(combined_messages)
StatisticsService-->>AnalysisService: legacy_messages
alt any LLM flag enabled
AnalysisService->>LLMAnalyzer: analyze_incremental_concurrent(legacy_messages, umo=group_id_with_platform_hint, topics_per_batch, quotes_per_batch, flags)
LLMAnalyzer-->>AnalysisService: topics, golden_quotes, token_usage, chat_quality_review
else
AnalysisService->>AnalysisService: skip LLM analysis
end
AnalysisService->>AnalysisService: build IncrementalBatch(group_id, stats, user_stats, emoji_stats, topics, quotes, token_usage, chat_quality_review)
AnalysisService->>IncrementalStore: save_batch(batch)
IncrementalStore-->>AnalysisService: saved
loop for each source watermark
AnalysisService->>IncrementalStore: update_last_analyzed_timestamp(progress_key, safe_ts)
IncrementalStore-->>AnalysisService: updated
end
AnalysisService-->>AutoScheduler: {success=True, batch_summary, messages_count, group_id, platform_id_hint}
deactivate AnalysisService
alt incremental_report_immediately enabled
AutoScheduler->>AutoScheduler: _perform_umo_group_analysis_with_timeout(group_id, sources, outputs, mode=incremental)
AutoScheduler->>AnalysisService: execute_incremental_final_report(group_id, platform_id=None, require_adapter=False)
AnalysisService-->>AutoScheduler: {success, analysis_result, messages_count}
AutoScheduler->>ReportDispatcher: dispatch(dest.group_id, analysis_result, dest.platform_id, report_group_id=group_id) for each dest
Dispatcher-->>AutoScheduler: dispatched
else
AutoScheduler->>Admin: incremental batches accumulated only
end
Class diagram for UMO Group config, scheduling, and analysisclassDiagram
class ConfigManager {
+UMO_GROUP_PREFIX
+UMO_GROUP_ID_INVALID_PATTERN
+is_group_allowed(group_id_or_umo str) bool
+is_umo_group_id(value str) bool
+normalize_umo_group_id(group_id str) str
+parse_umo_string(umo str) (str, str)
+get_umo_groups() list
+get_umo_group_map() dict
+get_umo_group(group_id str) dict
+get_umo_group_sources(group_id str) list~str~
+get_umo_group_outputs(group_id str) list~str~
+find_umo_groups_by_source(source_umo str) list~str~
+get_scheduled_group_list_mode() str
+get_scheduled_group_list() list
+get_incremental_group_list_mode() str
+get_incremental_group_list() list
+get_analysis_days() int
+get_max_messages() int
+get_incremental_safe_limit() int
+get_incremental_min_messages() int
+get_topic_analysis_enabled() bool
+get_user_title_analysis_enabled() bool
+get_golden_quote_analysis_enabled() bool
+get_chat_quality_analysis_enabled() bool
+get_max_user_titles() int
+get_incremental_topics_per_batch() int
+get_incremental_quotes_per_batch() int
+get_output_format() str
+get_bot_self_ids() list~str~
}
class AnalysisApplicationService {
+execute_daily_analysis(platform_id str, group_id str) dict
+execute_daily_analysis_for_sources(group_id str, source_umos list~str~) dict
+_analyze_messages(unified_messages list~UnifiedMessage~, group_id str, platform_id str, adapter Any, umo_override str) dict
+execute_incremental_analysis(platform_id str, group_id str) dict
+execute_incremental_analysis_for_sources(group_id str, source_umos list~str~) dict
+execute_incremental_final_report(group_id str, platform_id str, require_adapter bool) dict
+_build_source_progress_key(group_id str, source_umo str) str
-statistics_service
-analysis_domain_service
-config_manager
-llm_analyzer
-history_manager
-incremental_store
-bot_manager
-llm_semaphore
}
class AutoScheduler {
+_run_scheduled_report()
+_run_incremental_analysis()
+_get_scheduled_targets(mode_filter str) list
+_get_umo_group_targets(mode_filter str) list~dict~
+_normalize_group_entry(value str, defined_ids set~str~) str
+_is_group_selected(group_id str, mode str, configured_ids set~str~) bool
+_prepare_umo_group_sources(source_umos list~str~) list~str~
+_normalize_output_destinations(outputs list~str~) list~dict~
+_perform_auto_analysis_for_group(group_id str, target_platform_id str)
+_perform_auto_analysis_for_group_with_timeout(group_id str, target_platform_id str)
+_perform_umo_group_analysis(group_id str, source_umos list~str~, output_targets list~dict~, mode str)
+_perform_umo_group_analysis_with_timeout(group_id str, source_umos list~str~, output_targets list~dict~, mode str)
+_perform_incremental_analysis_for_group(group_id str, platform_id str)
+_perform_incremental_analysis_for_group_with_timeout(group_id str, platform_id str)
+_perform_incremental_analysis_for_umo_group(group_id str, source_umos list~str~)
+_perform_incremental_analysis_for_umo_group_with_timeout(group_id str, source_umos list~str~)
-config_manager
-analysis_service
-report_dispatcher
-bot_manager
-_terminating bool
}
class ReportDispatcher {
+dispatch(group_id str, analysis_result dict, platform_id str, report_group_id str)
+_dispatch_image(group_id str, analysis_result dict, platform_id str, report_group_id str) bool
+_dispatch_pdf(group_id str, analysis_result dict, platform_id str, report_group_id str) bool
+_dispatch_html(group_id str, analysis_result dict, platform_id str, report_group_id str) bool
-config_manager
-report_generator
-_html_render_func
}
class BotManager {
+get_adapter(platform_id str) Adapter
+is_ready_for_auto_analysis() bool
}
class Adapter {
+platform_id str
+fetch_messages(group_id str, days int, max_count int, since_ts int) list~dict~
+send_image(group_id str, image Any) bool
+send_file_or_text(group_id str, payload Any) bool
}
class IncrementalStore {
+get_last_analyzed_timestamp(progress_key str) int
+update_last_analyzed_timestamp(progress_key str, ts int)
+save_batch(batch IncrementalBatch)
}
class IncrementalBatch {
+group_id str
+timestamp float
+messages_count int
+characters_count int
+hourly_msg_counts dict
+hourly_char_counts dict
+user_stats dict
+emoji_stats dict
+topics list~dict~
+golden_quotes list~dict~
+token_usage dict
+chat_quality_review dict
+last_message_timestamp int
+participant_ids list~str~
+get_summary() dict
}
ConfigManager <.. AutoScheduler : uses
ConfigManager <.. AnalysisApplicationService : uses
ConfigManager <.. ReportDispatcher : uses
AutoScheduler --> AnalysisApplicationService : orchestrates
AutoScheduler --> ReportDispatcher : dispatches_reports
AutoScheduler --> BotManager : checks_adapters
AnalysisApplicationService --> BotManager : fetch_messages
AnalysisApplicationService --> IncrementalStore : read_write_progress
ReportDispatcher --> Adapter : send_outputs
IncrementalStore <.. AnalysisApplicationService
IncrementalBatch <.. AnalysisApplicationService
BotManager o-- Adapter
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Contributor
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- In
execute_incremental_analysis_for_sources, avoid the inlineimport timeand reuse the existingtime_modimport for consistency and to prevent redundant imports. - The scheduler’s UMO Group helper methods (
_get_umo_group_targets,_normalize_output_destinations, etc.) pass around loosely-typeddict/objectstructures; consider introducing a small dataclass or TypedDict for targets to make the code easier to follow and less error-prone when accessing fields likegroup_id,mode,sources, andoutputs.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `execute_incremental_analysis_for_sources`, avoid the inline `import time` and reuse the existing `time_mod` import for consistency and to prevent redundant imports.
- The scheduler’s UMO Group helper methods (`_get_umo_group_targets`, `_normalize_output_destinations`, etc.) pass around loosely-typed `dict`/`object` structures; consider introducing a small dataclass or TypedDict for targets to make the code easier to follow and less error-prone when accessing fields like `group_id`, `mode`, `sources`, and `outputs`.
## Individual Comments
### Comment 1
<location path="src/application/services/analysis_application_service.py" line_range="868-877" />
<code_context>
+ participant_ids=participant_ids,
+ )
+
+ await self.incremental_store.save_batch(batch)
+
+ import time
+
+ safe_now = int(time.time()) + 60
+ for progress_key, ts in source_watermarks.items():
+ safe_ts = min(ts, safe_now)
</code_context>
<issue_to_address>
**suggestion:** Avoid mixing `time_mod` and a local `import time` in the same function for clarity and consistency.
This function first uses `time_mod.time()` and later does a local `import time` and calls `time.time()`. Please use a single approach (e.g., only `time_mod` or a module‑level `import time`) so the time source stays consistent and easier to mock or trace.
```suggestion
await self.incremental_store.save_batch(batch)
safe_now = int(time_mod.time()) + 60
for progress_key, ts in source_watermarks.items():
safe_ts = min(ts, safe_now)
await self.incremental_store.update_last_analyzed_timestamp(
progress_key, safe_ts
)
```
</issue_to_address>
### Comment 2
<location path="src/infrastructure/config/config_manager.py" line_range="68-73" />
<code_context>
glist = [str(g) for g in self.get_group_list()]
target = str(group_id_or_umo)
+ target_umo_groups = set(self.find_umo_groups_by_source(target))
+
target_simple_id = target.split(":")[-1] if ":" in target else target
</code_context>
<issue_to_address>
**suggestion (performance):** New UMO-group matching path in `is_group_allowed` may be relatively expensive due to repeated scans of all UMO groups.
`target_umo_groups = set(self.find_umo_groups_by_source(target))` causes every `is_group_allowed` call to traverse all configured UMO groups (via `get_umo_group_map`), even when the input contains no UMO-group IDs. Since `is_group_allowed` is used in scheduling paths like `_prepare_umo_group_sources`, this may introduce noticeable overhead at scale. Please either skip this UMO-group lookup when the group list has no UMO-group IDs, or cache the mapping used by `find_umo_groups_by_source` so it isn’t rebuilt on each call.
```suggestion
glist = [str(g) for g in self.get_group_list()]
target = str(group_id_or_umo)
# Only resolve UMO groups by source when there are UMO-group IDs in the group list.
# This avoids repeatedly scanning all configured UMO groups in common non-UMO paths.
if any(self.is_umo_group_id(item) for item in glist):
target_umo_groups = set(self.find_umo_groups_by_source(target))
else:
target_umo_groups = set()
target_simple_id = target.split(":")[-1] if ":" in target else target
```
</issue_to_address>
### Comment 3
<location path="src/infrastructure/reporting/dispatcher.py" line_range="43-48" />
<code_context>
"""
trace_id = TraceContext.get()
output_format = self.config_manager.get_output_format()
+ target_group_id = report_group_id or group_id
logger.info(
f"[{trace_id}] 正在分发群 {group_id} 的报告 (格式: {output_format})"
)
</code_context>
<issue_to_address>
**suggestion:** Log message still references the physical target group, not the logical report group ID.
`target_group_id` is now used for content generation, but the log still prints `group_id`, which is the delivery destination. For UMO groups this can be confusing, since the report may originate from a different logical group. Please log both the physical `group_id` and the logical `report_group_id` (when present) so the analysis origin and delivery destination are clearly distinguishable when debugging.
```suggestion
trace_id = TraceContext.get()
output_format = self.config_manager.get_output_format()
target_group_id = report_group_id or group_id
if report_group_id:
logger.info(
f"[{trace_id}] 正在分发报告 (逻辑群: {report_group_id}, 发送目标群: {group_id}, 格式: {output_format})"
)
else:
logger.info(
f"[{trace_id}] 正在分发群 {group_id} 的报告 (格式: {output_format})"
)
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Contributor
Author
|
還沒測試,先轉成 Draft |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary by Sourcery
Add support for aggregating analyses across multiple UMO sources into virtual UMO groups and integrate them into existing daily and incremental reporting flows.
New Features:
Enhancements:
Documentation: