@@ -19,19 +19,29 @@ const CONFIG = {
1919 key : process . env . CPA_REDIS_KEY || 'queue' ,
2020 } ,
2121 // 本适配器监听的端口
22- port : parseInt ( process . env . ADAPTER_PORT || '3001 ' ) ,
22+ port : parseInt ( process . env . ADAPTER_PORT || '36871 ' ) ,
2323 // 轮询间隔 (毫秒)
2424 pollInterval : parseInt ( process . env . POLL_INTERVAL || '15000' ) ,
2525 // 内存中保留的最大记录数
26- maxBufferSize : parseInt ( process . env . MAX_BUFFER_SIZE || '5000 ' ) ,
26+ maxBufferSize : parseInt ( process . env . MAX_BUFFER_SIZE || '50000 ' ) ,
2727 // 每次拉取的最大记录数
28- batchSize : parseInt ( process . env . BATCH_SIZE || '100 ' ) ,
28+ batchSize : parseInt ( process . env . BATCH_SIZE || '500 ' ) ,
2929 // 访问 /usage 后是否清空内存缓冲区;true=增量导出,false=保留全量内存快照
3030 clearBufferOnRead : ( process . env . CLEAR_BUFFER_ON_READ || 'false' ) . toLowerCase ( ) === 'true' ,
31+ // 远端 dashboard sync 配置
32+ sync : {
33+ enabled : ( process . env . ENABLE_PERIODIC_SYNC || 'false' ) . toLowerCase ( ) === 'true' ,
34+ dashboardUrl : ( process . env . DASHBOARD_URL || '' ) . trim ( ) . replace ( / \/ $ / , '' ) ,
35+ token : ( process . env . SYNC_TOKEN || process . env . CPA_SECRET_KEY || '' ) . trim ( ) ,
36+ interval : parseInt ( process . env . SYNC_INTERVAL || '6000000' ) , // 默认同步间隔 100 分钟
37+ timeoutMs : parseInt ( process . env . SYNC_TIMEOUT_MS || '300000' ) ,
38+ syncOnStart : ( process . env . SYNC_ON_START || 'false' ) . toLowerCase ( ) === 'true' ,
39+ } ,
3140} ;
3241
3342// 内存缓冲区,用于存放最近拉取的记录
3443let usageBuffer = [ ] ;
44+ let syncInProgress = false ;
3545
3646// 初始化 Redis 客户端
3747const redis = new Redis ( {
@@ -80,6 +90,38 @@ function normalizeRecord(record) {
8090 } ;
8191}
8292
93+ function toPositiveInt ( value , fallback ) {
94+ const parsed = Number . parseInt ( String ( value ?? '' ) , 10 ) ;
95+ if ( ! Number . isFinite ( parsed ) || parsed <= 0 ) return fallback ;
96+ return parsed ;
97+ }
98+
99+ function getSyncConfig ( ) {
100+ const interval = toPositiveInt ( CONFIG . sync . interval , 60000 ) ;
101+ const timeoutMs = toPositiveInt ( CONFIG . sync . timeoutMs , 30000 ) ;
102+
103+ return {
104+ ...CONFIG . sync ,
105+ interval,
106+ timeoutMs,
107+ } ;
108+ }
109+
110+ function getSyncUrl ( ) {
111+ const { dashboardUrl } = getSyncConfig ( ) ;
112+ if ( ! dashboardUrl ) return '' ;
113+
114+ try {
115+ const url = new URL ( '/api/sync' , `${ dashboardUrl } /` ) ;
116+ if ( url . protocol !== 'http:' && url . protocol !== 'https:' ) {
117+ return '' ;
118+ }
119+ return url . toString ( ) ;
120+ } catch {
121+ return '' ;
122+ }
123+ }
124+
83125/**
84126 * 从 Redis 拉取并聚合数据
85127 */
@@ -124,10 +166,77 @@ async function drainQueue() {
124166 }
125167}
126168
169+ async function triggerSync ( reason = 'interval' ) {
170+ const syncConfig = getSyncConfig ( ) ;
171+
172+ if ( ! syncConfig . enabled ) return ;
173+
174+ const syncUrl = getSyncUrl ( ) ;
175+ if ( ! syncUrl ) {
176+ console . error ( '[sync] Invalid or missing DASHBOARD_URL' ) ;
177+ return ;
178+ }
179+
180+ if ( ! syncConfig . token ) {
181+ console . error ( '[sync] Missing SYNC_TOKEN/CRON_SECRET/PASSWORD' ) ;
182+ return ;
183+ }
184+
185+ if ( syncInProgress ) {
186+ console . warn ( '[sync] Previous sync still in progress, skipped' ) ;
187+ return ;
188+ }
189+
190+ syncInProgress = true ;
191+ const controller = new AbortController ( ) ;
192+ const timeoutId = setTimeout ( ( ) => controller . abort ( ) , syncConfig . timeoutMs ) ;
193+
194+ try {
195+ const response = await fetch ( syncUrl , {
196+ method : 'GET' ,
197+ headers : {
198+ Authorization : `Bearer ${ syncConfig . token } ` ,
199+ } ,
200+ signal : controller . signal ,
201+ } ) ;
202+
203+ if ( ! response . ok ) {
204+ console . error ( `[sync] Trigger failed (${ reason } ): ${ response . status } ${ response . statusText } ` ) ;
205+ return ;
206+ }
207+
208+ let result = null ;
209+ try {
210+ result = await response . json ( ) ;
211+ } catch {
212+ result = null ;
213+ }
214+
215+ console . log ( `[sync] Triggered (${ reason } )` , result || { status : response . status } ) ;
216+ } catch ( error ) {
217+ const isTimeout = error instanceof Error && error . name === 'AbortError' ;
218+ console . error ( `[sync] Trigger error (${ reason } ): ${ isTimeout ? 'timeout' : error . message } ` ) ;
219+ } finally {
220+ clearTimeout ( timeoutId ) ;
221+ syncInProgress = false ;
222+ }
223+ }
224+
127225// 定时任务
128226setInterval ( drainQueue , CONFIG . pollInterval ) ;
129227drainQueue ( ) ;
130228
229+ const syncConfig = getSyncConfig ( ) ;
230+ if ( syncConfig . enabled ) {
231+ setInterval ( ( ) => {
232+ triggerSync ( 'interval' ) ;
233+ } , syncConfig . interval ) ;
234+
235+ if ( syncConfig . syncOnStart ) {
236+ triggerSync ( 'startup' ) ;
237+ }
238+ }
239+
131240/**
132241 * 将内存缓冲区的数据转换为旧版 /usage 聚合格式
133242 */
@@ -215,4 +324,12 @@ server.listen(CONFIG.port, () => {
215324 console . log ( `Polling CPA Redis at ${ CONFIG . redis . host } :${ CONFIG . redis . port } ` ) ;
216325 console . log ( `Redis queue key: ${ CONFIG . redis . key } ` ) ;
217326 console . log ( `Clear buffer on read: ${ CONFIG . clearBufferOnRead } ` ) ;
327+
328+ const syncUrl = getSyncUrl ( ) ;
329+ console . log ( `Periodic sync enabled: ${ syncConfig . enabled } ` ) ;
330+ if ( syncConfig . enabled ) {
331+ console . log ( `Periodic sync target: ${ syncUrl || 'invalid DASHBOARD_URL' } ` ) ;
332+ console . log ( `Periodic sync interval: ${ syncConfig . interval } ms` ) ;
333+ console . log ( `Periodic sync on start: ${ syncConfig . syncOnStart } ` ) ;
334+ }
218335} ) ;
0 commit comments