forked from james-6-23/codex2api
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpostgres.go
More file actions
3907 lines (3607 loc) · 139 KB
/
postgres.go
File metadata and controls
3907 lines (3607 loc) · 139 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package database
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/lib/pq"
_ "modernc.org/sqlite"
)
// AccountRow 数据库中的账号行
type AccountRow struct {
ID int64
Name string
Platform string
Type string
Credentials map[string]interface{}
ProxyURL string
Status string
CooldownReason string
CooldownUntil sql.NullTime
ErrorMessage string
Enabled bool
Locked bool
CreditEnabled bool
CreditSkipUsageWindow bool
ScoreBiasOverride sql.NullInt64
BaseConcurrencyOverride sql.NullInt64
Tags []string
CreatedAt time.Time
UpdatedAt time.Time
}
type AccountModelCooldownRow struct {
AccountID int64
Model string
Reason string
ResetAt time.Time
UpdatedAt time.Time
}
type OptionalInt64Slice struct {
Set bool
Values []int64
}
type OptionalStringSlice struct {
Set bool
Values []string
}
type OptionalString struct {
Set bool
Value string
}
type OptionalNullInt64 struct {
Set bool
Value sql.NullInt64
}
// AccountCredentialIndex holds pre-built sets of existing credentials for fast import dedup.
type AccountCredentialIndex struct {
RefreshTokens map[string]bool
AccessTokens map[string]bool
SessionTokens map[string]bool
AccountIDs map[string]bool
}
// GetCredential 从 credentials JSONB 获取字符串字段
func (a *AccountRow) GetCredential(key string) string {
if a.Credentials == nil {
return ""
}
v, ok := a.Credentials[key]
if !ok || v == nil {
return ""
}
switch val := v.(type) {
case string:
return val
case float64:
return fmt.Sprintf("%v", val)
default:
return ""
}
}
func (a *AccountRow) GetCredentialInt64Slice(key string) []int64 {
if a.Credentials == nil {
return []int64{}
}
value, ok := a.Credentials[key]
if !ok {
return []int64{}
}
return int64SliceFromValue(value)
}
func (a *AccountRow) GetCredentialStringSlice(key string) []string {
if a.Credentials == nil {
return []string{}
}
value, ok := a.Credentials[key]
if !ok || value == nil {
return []string{}
}
return stringSliceFromValue(value)
}
// DB PostgreSQL 数据库操作
type DB struct {
conn *sql.DB
driver string
// 使用日志批量写入缓冲
logBuf []usageLogEntry
logMu sync.Mutex
logStop chan struct{}
logWg sync.WaitGroup
usageLogMode atomic.Value // string: full|errors|off
usageLogBatchSize int64
usageLogFlushInterval int64 // ns
logFlushNotify chan struct{}
accountInsertMu sync.Mutex
}
const (
UsageLogModeFull = "full"
UsageLogModeErrors = "errors"
UsageLogModeOff = "off"
defaultUsageLogMode = UsageLogModeFull
defaultUsageLogBatchSize = 200
defaultUsageLogFlushIntervalSeconds = 5
minUsageLogBatchSize = 1
maxUsageLogBatchSize = 10000
minUsageLogFlushIntervalSeconds = 1
maxUsageLogFlushIntervalSeconds = 300
)
var ErrDuplicateAccountCredential = errors.New("duplicate account credential")
func NormalizeUsageLogMode(mode string) string {
switch strings.ToLower(strings.TrimSpace(mode)) {
case "", UsageLogModeFull:
return UsageLogModeFull
case UsageLogModeErrors:
return UsageLogModeErrors
case UsageLogModeOff:
return UsageLogModeOff
default:
return UsageLogModeFull
}
}
func NormalizeUsageLogBatchSize(n int) int {
if n < minUsageLogBatchSize {
return defaultUsageLogBatchSize
}
if n > maxUsageLogBatchSize {
return maxUsageLogBatchSize
}
return n
}
func NormalizeUsageLogFlushIntervalSeconds(n int) int {
if n < minUsageLogFlushIntervalSeconds {
return defaultUsageLogFlushIntervalSeconds
}
if n > maxUsageLogFlushIntervalSeconds {
return maxUsageLogFlushIntervalSeconds
}
return n
}
// usageLogEntry 日志缓冲条目
type usageLogEntry struct {
AccountID int64
Endpoint string
Model string
EffectiveModel string
PromptTokens int
CompletionTokens int
TotalTokens int
StatusCode int
DurationMs int
InputTokens int
OutputTokens int
ReasoningTokens int
FirstTokenMs int
ReasoningEffort string
InboundEndpoint string
UpstreamEndpoint string
Stream bool
CachedTokens int
ServiceTier string
APIKeyID int64
APIKeyName string
APIKeyMasked string
ImageCount int
ImageWidth int
ImageHeight int
ImageBytes int
ImageFormat string
ImageSize string
AccountBilled float64
UserBilled float64
IsRetryAttempt bool
AttemptIndex int
UpstreamErrorKind string
ErrorMessage string
}
// New 创建数据库连接并自动建表。
// schema 仅对 PostgreSQL 生效;为空时保持数据库默认 search_path。
func New(driver string, dsn string, schema ...string) (*DB, error) {
driver = normalizeDriver(driver)
driverName := driver
if driver == "sqlite" {
driverName = "sqlite"
}
pgSchema := ""
if len(schema) > 0 {
pgSchema = strings.TrimSpace(schema[0])
}
conn, err := sql.Open(driverName, dsn)
if err != nil {
return nil, fmt.Errorf("连接数据库失败: %w", err)
}
// ==================== 连接池优化 ====================
if driver == "sqlite" {
conn.SetMaxOpenConns(1)
conn.SetMaxIdleConns(1)
} else {
// 高并发场景:大量 RT 刷新 + 前端查询 + 使用日志写入 并行
conn.SetMaxOpenConns(100) // 增加最大打开连接数以处理更高并发
conn.SetMaxIdleConns(50) // 增加空闲连接数以保持热连接
conn.SetConnMaxLifetime(60 * time.Minute) // 增加连接最大生存时间
conn.SetConnMaxIdleTime(30 * time.Minute) // 增加空闲连接最大闲置时间
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := conn.PingContext(ctx); err != nil {
return nil, fmt.Errorf("数据库连接测试失败: %w", err)
}
db := &DB{
conn: conn,
driver: driver,
logStop: make(chan struct{}),
logFlushNotify: make(chan struct{}, 1),
}
db.SetUsageLogConfig(defaultUsageLogMode, defaultUsageLogBatchSize, defaultUsageLogFlushIntervalSeconds)
if db.isSQLite() {
if err := db.configureSQLite(ctx); err != nil {
return nil, fmt.Errorf("配置 SQLite 失败: %w", err)
}
} else {
// PostgreSQL: 统一会话时区为 UTC,确保 NOW() 和时间字面量一致
if _, err := conn.ExecContext(ctx, "SET timezone = 'UTC'"); err != nil {
return nil, fmt.Errorf("设置数据库时区失败: %w", err)
}
// 自定义 schema:确保 schema 存在并确认当前会话 search_path 已生效。
// search_path 已通过 DSN 的 options=-c search_path=... 在所有连接启动时设置;
// 这里仅做一次幂等的 CREATE SCHEMA + SET 兜底,便于首次部署时自动建好 schema。
if pgSchema != "" {
quoted := pq.QuoteIdentifier(pgSchema)
if _, err := conn.ExecContext(ctx, "CREATE SCHEMA IF NOT EXISTS "+quoted); err != nil {
return nil, fmt.Errorf("创建数据库 schema 失败: %w", err)
}
if _, err := conn.ExecContext(ctx, "SET search_path TO "+quoted+", public"); err != nil {
return nil, fmt.Errorf("设置 search_path 失败: %w", err)
}
}
}
if err := db.migrate(ctx); err != nil {
return nil, fmt.Errorf("数据库迁移失败: %w", err)
}
// 启动批量写入后台协程
db.startLogFlusher()
baselineInsert := `
INSERT INTO usage_stats_baseline (id) VALUES (1) ON CONFLICT DO NOTHING
`
if db.isSQLite() {
baselineInsert = `
INSERT OR IGNORE INTO usage_stats_baseline (id) VALUES (1)
`
}
_, err = db.conn.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS usage_stats_baseline (
id INTEGER PRIMARY KEY DEFAULT 1 CHECK (id = 1),
total_requests BIGINT NOT NULL DEFAULT 0,
total_tokens BIGINT NOT NULL DEFAULT 0,
prompt_tokens BIGINT NOT NULL DEFAULT 0,
completion_tokens BIGINT NOT NULL DEFAULT 0,
cached_tokens BIGINT NOT NULL DEFAULT 0,
cache_hit_requests BIGINT NOT NULL DEFAULT 0,
first_token_ms_sum DOUBLE PRECISION NOT NULL DEFAULT 0,
first_token_samples BIGINT NOT NULL DEFAULT 0,
account_billed DOUBLE PRECISION NOT NULL DEFAULT 0,
user_billed DOUBLE PRECISION NOT NULL DEFAULT 0
)
`)
if err != nil {
return nil, fmt.Errorf("创建 usage_stats_baseline 表失败: %w", err)
}
// 确保 baseline 行存在
_, err = db.conn.ExecContext(ctx, baselineInsert)
if err != nil {
return nil, fmt.Errorf("初始化 usage_stats_baseline 失败: %w", err)
}
if err := db.ensureUsageStatsBaselineBillingColumns(ctx); err != nil {
return nil, err
}
return db, nil
}
func (db *DB) ensureUsageStatsBaselineBillingColumns(ctx context.Context) error {
if db.isSQLite() {
columns, err := db.sqliteTableColumns(ctx, "usage_stats_baseline")
if err != nil {
return err
}
for _, column := range []struct {
name string
def string
}{
{name: "account_billed", def: "REAL NOT NULL DEFAULT 0"},
{name: "user_billed", def: "REAL NOT NULL DEFAULT 0"},
{name: "cache_hit_requests", def: "INTEGER NOT NULL DEFAULT 0"},
{name: "first_token_ms_sum", def: "REAL NOT NULL DEFAULT 0"},
{name: "first_token_samples", def: "INTEGER NOT NULL DEFAULT 0"},
} {
if _, ok := columns[column.name]; ok {
continue
}
if _, err := db.conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE usage_stats_baseline ADD COLUMN %s %s", column.name, column.def)); err != nil {
return err
}
}
return nil
}
_, err := db.conn.ExecContext(ctx, `
ALTER TABLE usage_stats_baseline ADD COLUMN IF NOT EXISTS account_billed DOUBLE PRECISION NOT NULL DEFAULT 0;
ALTER TABLE usage_stats_baseline ADD COLUMN IF NOT EXISTS user_billed DOUBLE PRECISION NOT NULL DEFAULT 0;
ALTER TABLE usage_stats_baseline ADD COLUMN IF NOT EXISTS cache_hit_requests BIGINT NOT NULL DEFAULT 0;
ALTER TABLE usage_stats_baseline ADD COLUMN IF NOT EXISTS first_token_ms_sum DOUBLE PRECISION NOT NULL DEFAULT 0;
ALTER TABLE usage_stats_baseline ADD COLUMN IF NOT EXISTS first_token_samples BIGINT NOT NULL DEFAULT 0;
`)
return err
}
// Close 关闭数据库连接
func (db *DB) Close() error {
// 停止批量写入并刷完缓冲
close(db.logStop)
db.logWg.Wait()
db.flushLogs() // 最后一次 flush
return db.conn.Close()
}
func (db *DB) SetUsageLogConfig(mode string, batchSize int, flushIntervalSeconds int) {
if db == nil {
return
}
mode = NormalizeUsageLogMode(mode)
batchSize = NormalizeUsageLogBatchSize(batchSize)
flushIntervalSeconds = NormalizeUsageLogFlushIntervalSeconds(flushIntervalSeconds)
db.usageLogMode.Store(mode)
atomic.StoreInt64(&db.usageLogBatchSize, int64(batchSize))
atomic.StoreInt64(&db.usageLogFlushInterval, int64(time.Duration(flushIntervalSeconds)*time.Second))
}
func (db *DB) GetUsageLogMode() string {
if db == nil {
return defaultUsageLogMode
}
if v, ok := db.usageLogMode.Load().(string); ok && v != "" {
return NormalizeUsageLogMode(v)
}
return defaultUsageLogMode
}
func (db *DB) GetUsageLogBatchSize() int {
if db == nil {
return defaultUsageLogBatchSize
}
n := int(atomic.LoadInt64(&db.usageLogBatchSize))
return NormalizeUsageLogBatchSize(n)
}
func (db *DB) GetUsageLogFlushIntervalSeconds() int {
if db == nil {
return defaultUsageLogFlushIntervalSeconds
}
d := time.Duration(atomic.LoadInt64(&db.usageLogFlushInterval))
if d <= 0 {
return defaultUsageLogFlushIntervalSeconds
}
return NormalizeUsageLogFlushIntervalSeconds(int(d / time.Second))
}
func (db *DB) getUsageLogFlushInterval() time.Duration {
if db == nil {
return time.Duration(defaultUsageLogFlushIntervalSeconds) * time.Second
}
d := time.Duration(atomic.LoadInt64(&db.usageLogFlushInterval))
if d <= 0 {
return time.Duration(defaultUsageLogFlushIntervalSeconds) * time.Second
}
return d
}
func (db *DB) shouldStoreUsageLog(input *UsageLogInput) bool {
switch db.GetUsageLogMode() {
case UsageLogModeOff:
return false
case UsageLogModeErrors:
return input != nil && input.StatusCode >= 400
default:
return true
}
}
func (db *DB) notifyLogFlush() {
if db == nil || db.logFlushNotify == nil {
return
}
select {
case db.logFlushNotify <- struct{}{}:
default:
}
}
// migrate 自动建表
func (db *DB) migrate(ctx context.Context) error {
if db.isSQLite() {
return db.migrateSQLite(ctx)
}
query := `
CREATE TABLE IF NOT EXISTS accounts (
id SERIAL PRIMARY KEY,
name VARCHAR(255) DEFAULT '',
platform VARCHAR(50) DEFAULT 'openai',
type VARCHAR(50) DEFAULT 'oauth',
credentials JSONB NOT NULL DEFAULT '{}',
proxy_url VARCHAR(500) DEFAULT '',
status VARCHAR(50) DEFAULT 'active',
error_message TEXT DEFAULT '',
deleted_at TIMESTAMPTZ NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
ALTER TABLE accounts ADD COLUMN IF NOT EXISTS cooldown_reason VARCHAR(50) DEFAULT '';
ALTER TABLE accounts ADD COLUMN IF NOT EXISTS cooldown_until TIMESTAMPTZ NULL;
ALTER TABLE accounts ADD COLUMN IF NOT EXISTS enabled BOOLEAN DEFAULT TRUE;
ALTER TABLE accounts ADD COLUMN IF NOT EXISTS locked BOOLEAN DEFAULT FALSE;
ALTER TABLE accounts ADD COLUMN IF NOT EXISTS score_bias_override INT NULL;
ALTER TABLE accounts ADD COLUMN IF NOT EXISTS base_concurrency_override INT NULL;
ALTER TABLE accounts ADD COLUMN IF NOT EXISTS deleted_at TIMESTAMPTZ NULL;
ALTER TABLE accounts ADD COLUMN IF NOT EXISTS image_quota_remaining INT NULL;
ALTER TABLE accounts ADD COLUMN IF NOT EXISTS image_quota_total INT NULL;
ALTER TABLE accounts ADD COLUMN IF NOT EXISTS today_used_count INT DEFAULT 0;
ALTER TABLE accounts ADD COLUMN IF NOT EXISTS image_quota_reset_at TIMESTAMPTZ NULL;
ALTER TABLE accounts ADD COLUMN IF NOT EXISTS tags JSONB DEFAULT '[]'::jsonb;
ALTER TABLE accounts ADD COLUMN IF NOT EXISTS credit_enabled BOOLEAN DEFAULT FALSE;
ALTER TABLE accounts ADD COLUMN IF NOT EXISTS credit_skip_usage_window BOOLEAN DEFAULT FALSE;
CREATE TABLE IF NOT EXISTS account_groups (
id SERIAL PRIMARY KEY,
name VARCHAR(80) UNIQUE NOT NULL,
description TEXT DEFAULT '',
color VARCHAR(20) DEFAULT '',
sort_order INT DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
ALTER TABLE account_groups ADD COLUMN IF NOT EXISTS description TEXT DEFAULT '';
ALTER TABLE account_groups ADD COLUMN IF NOT EXISTS color VARCHAR(20) DEFAULT '';
ALTER TABLE account_groups ADD COLUMN IF NOT EXISTS sort_order INT DEFAULT 0;
ALTER TABLE account_groups ADD COLUMN IF NOT EXISTS created_at TIMESTAMPTZ DEFAULT NOW();
ALTER TABLE account_groups ADD COLUMN IF NOT EXISTS updated_at TIMESTAMPTZ DEFAULT NOW();
CREATE TABLE IF NOT EXISTS account_group_members (
account_id BIGINT NOT NULL,
group_id BIGINT NOT NULL,
PRIMARY KEY (account_id, group_id)
);
CREATE INDEX IF NOT EXISTS idx_account_group_members_group ON account_group_members(group_id);
CREATE INDEX IF NOT EXISTS idx_account_group_members_account ON account_group_members(account_id);
UPDATE accounts
SET status = 'deleted',
error_message = '',
cooldown_reason = '',
cooldown_until = NULL,
deleted_at = COALESCE(deleted_at, updated_at, NOW()),
updated_at = NOW()
WHERE status <> 'deleted' AND COALESCE(error_message, '') = 'deleted';
CREATE INDEX IF NOT EXISTS idx_accounts_status ON accounts(status);
CREATE INDEX IF NOT EXISTS idx_accounts_platform ON accounts(platform);
CREATE INDEX IF NOT EXISTS idx_accounts_cooldown_until ON accounts(cooldown_until);
CREATE TABLE IF NOT EXISTS usage_logs (
id SERIAL PRIMARY KEY,
account_id INT DEFAULT 0,
endpoint VARCHAR(100) DEFAULT '',
model VARCHAR(100) DEFAULT '',
prompt_tokens INT DEFAULT 0,
completion_tokens INT DEFAULT 0,
total_tokens INT DEFAULT 0,
status_code INT DEFAULT 0,
duration_ms INT DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_usage_logs_created_at ON usage_logs(created_at);
CREATE INDEX IF NOT EXISTS idx_usage_logs_account_id ON usage_logs(account_id);
CREATE INDEX IF NOT EXISTS idx_usage_logs_created_status ON usage_logs(created_at, status_code);
CREATE INDEX IF NOT EXISTS idx_usage_logs_account_status ON usage_logs(account_id, status_code);
-- 增强字段(向后兼容 ALTER)
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS input_tokens INT DEFAULT 0;
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS output_tokens INT DEFAULT 0;
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS reasoning_tokens INT DEFAULT 0;
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS first_token_ms INT DEFAULT 0;
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS reasoning_effort VARCHAR(20) DEFAULT '';
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS effective_model VARCHAR(100) DEFAULT '';
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS inbound_endpoint VARCHAR(100) DEFAULT '';
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS upstream_endpoint VARCHAR(100) DEFAULT '';
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS stream BOOLEAN DEFAULT false;
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS cached_tokens INT DEFAULT 0;
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS service_tier VARCHAR(20) DEFAULT '';
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS api_key_id INT DEFAULT 0;
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS api_key_name VARCHAR(255) DEFAULT '';
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS api_key_masked VARCHAR(64) DEFAULT '';
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS image_count INT DEFAULT 0;
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS image_width INT DEFAULT 0;
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS image_height INT DEFAULT 0;
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS image_bytes INT DEFAULT 0;
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS image_format VARCHAR(20) DEFAULT '';
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS image_size VARCHAR(32) DEFAULT '';
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS account_billed DOUBLE PRECISION DEFAULT 0;
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS user_billed DOUBLE PRECISION DEFAULT 0;
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS is_retry_attempt BOOLEAN DEFAULT FALSE;
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS attempt_index INT DEFAULT 0;
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS upstream_error_kind VARCHAR(64) DEFAULT '';
ALTER TABLE usage_logs ADD COLUMN IF NOT EXISTS error_message TEXT DEFAULT '';
CREATE INDEX IF NOT EXISTS idx_usage_logs_api_key_created_at ON usage_logs(api_key_id, created_at);
CREATE TABLE IF NOT EXISTS api_keys (
id SERIAL PRIMARY KEY,
name VARCHAR(255) DEFAULT '',
key VARCHAR(255) NOT NULL UNIQUE,
quota_limit DOUBLE PRECISION DEFAULT 0,
quota_used DOUBLE PRECISION DEFAULT 0,
expires_at TIMESTAMPTZ NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
ALTER TABLE api_keys ADD COLUMN IF NOT EXISTS quota_limit DOUBLE PRECISION DEFAULT 0;
ALTER TABLE api_keys ADD COLUMN IF NOT EXISTS quota_used DOUBLE PRECISION DEFAULT 0;
ALTER TABLE api_keys ADD COLUMN IF NOT EXISTS expires_at TIMESTAMPTZ NULL;
CREATE INDEX IF NOT EXISTS idx_api_keys_expires_at ON api_keys(expires_at);
ALTER TABLE api_keys ADD COLUMN IF NOT EXISTS allowed_group_ids JSONB DEFAULT '[]'::jsonb;
CREATE TABLE IF NOT EXISTS system_settings (
id INTEGER PRIMARY KEY DEFAULT 1 CHECK (id = 1),
site_name TEXT DEFAULT 'CodexProxy',
site_logo TEXT DEFAULT '',
max_concurrency INT DEFAULT 2,
global_rpm INT DEFAULT 0,
test_model VARCHAR(100) DEFAULT 'gpt-5.4',
test_concurrency INT DEFAULT 50,
proxy_url VARCHAR(500) DEFAULT '',
pg_max_conns INT DEFAULT 50,
redis_pool_size INT DEFAULT 30,
auto_clean_unauthorized BOOLEAN DEFAULT FALSE,
auto_clean_rate_limited BOOLEAN DEFAULT FALSE,
background_refresh_interval_minutes INT DEFAULT 2,
usage_probe_max_age_minutes INT DEFAULT 10,
recovery_probe_interval_minutes INT DEFAULT 30,
scheduler_mode VARCHAR(20) DEFAULT 'round_robin'
);
CREATE TABLE IF NOT EXISTS account_model_cooldowns (
account_id BIGINT NOT NULL,
model VARCHAR(100) NOT NULL,
reason VARCHAR(64) DEFAULT '',
reset_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (account_id, model)
);
CREATE INDEX IF NOT EXISTS idx_account_model_cooldowns_reset_at ON account_model_cooldowns(reset_at);
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS site_name TEXT DEFAULT 'CodexProxy';
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS site_logo TEXT DEFAULT '';
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS pg_max_conns INT DEFAULT 50;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS redis_pool_size INT DEFAULT 30;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS auto_clean_unauthorized BOOLEAN DEFAULT FALSE;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS auto_clean_rate_limited BOOLEAN DEFAULT FALSE;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS admin_secret VARCHAR(255) DEFAULT '';
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS auto_clean_full_usage BOOLEAN DEFAULT FALSE;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS proxy_pool_enabled BOOLEAN DEFAULT FALSE;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS fast_scheduler_enabled BOOLEAN DEFAULT FALSE;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS max_retries INT DEFAULT 2;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS allow_remote_migration BOOLEAN DEFAULT FALSE;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS max_rate_limit_retries INT DEFAULT 1;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS auto_clean_error BOOLEAN DEFAULT FALSE;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS auto_clean_expired BOOLEAN DEFAULT FALSE;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS model_mapping TEXT DEFAULT '{}';
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS background_refresh_interval_minutes INT DEFAULT 2;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS usage_probe_max_age_minutes INT DEFAULT 10;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS recovery_probe_interval_minutes INT DEFAULT 30;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS scheduler_mode VARCHAR(20) DEFAULT 'round_robin';
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS resin_url TEXT DEFAULT '';
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS resin_platform_name TEXT DEFAULT '';
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS prompt_filter_enabled BOOLEAN DEFAULT FALSE;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS prompt_filter_mode VARCHAR(20) DEFAULT 'monitor';
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS prompt_filter_threshold INT DEFAULT 50;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS prompt_filter_strict_threshold INT DEFAULT 90;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS prompt_filter_log_matches BOOLEAN DEFAULT TRUE;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS prompt_filter_max_text_length INT DEFAULT 81920;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS prompt_filter_sensitive_words TEXT DEFAULT '';
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS prompt_filter_custom_patterns TEXT DEFAULT '[]';
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS prompt_filter_disabled_patterns TEXT DEFAULT '[]';
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS client_compat_mode VARCHAR(20) DEFAULT 'preserve';
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS codex_min_cli_version VARCHAR(32) DEFAULT '0.118.0';
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS usage_log_mode VARCHAR(20) DEFAULT 'full';
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS usage_log_batch_size INT DEFAULT 200;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS usage_log_flush_interval_seconds INT DEFAULT 5;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS stream_flush_policy VARCHAR(20) DEFAULT 'immediate';
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS stream_flush_interval_ms INT DEFAULT 20;
ALTER TABLE system_settings ADD COLUMN IF NOT EXISTS image_storage_config TEXT DEFAULT '{}';
CREATE TABLE IF NOT EXISTS prompt_filter_logs (
id SERIAL PRIMARY KEY,
created_at TIMESTAMPTZ DEFAULT NOW(),
source VARCHAR(50) DEFAULT '',
endpoint VARCHAR(100) DEFAULT '',
model VARCHAR(100) DEFAULT '',
action VARCHAR(20) DEFAULT '',
mode VARCHAR(20) DEFAULT '',
score INT DEFAULT 0,
threshold_value INT DEFAULT 0,
matched_patterns TEXT DEFAULT '[]',
text_preview TEXT DEFAULT '',
api_key_id INT DEFAULT 0,
api_key_name VARCHAR(255) DEFAULT '',
api_key_masked VARCHAR(64) DEFAULT '',
client_ip VARCHAR(64) DEFAULT '',
error_code VARCHAR(100) DEFAULT ''
);
CREATE INDEX IF NOT EXISTS idx_prompt_filter_logs_created_at ON prompt_filter_logs(created_at);
CREATE INDEX IF NOT EXISTS idx_prompt_filter_logs_action_created_at ON prompt_filter_logs(action, created_at);
CREATE TABLE IF NOT EXISTS model_registry (
id VARCHAR(100) PRIMARY KEY,
enabled BOOLEAN DEFAULT TRUE,
category VARCHAR(50) DEFAULT 'codex',
source VARCHAR(50) DEFAULT 'manual',
pro_only BOOLEAN DEFAULT FALSE,
api_key_auth_available BOOLEAN DEFAULT TRUE,
last_seen_at TIMESTAMPTZ NULL,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS model_registry_sync (
id INTEGER PRIMARY KEY DEFAULT 1 CHECK (id = 1),
source_url TEXT DEFAULT '',
last_synced_at TIMESTAMPTZ NULL
);
CREATE TABLE IF NOT EXISTS proxies (
id SERIAL PRIMARY KEY,
url VARCHAR(500) NOT NULL UNIQUE,
label VARCHAR(255) DEFAULT '',
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMPTZ DEFAULT NOW()
);
ALTER TABLE proxies ADD COLUMN IF NOT EXISTS test_ip VARCHAR(100) DEFAULT '';
ALTER TABLE proxies ADD COLUMN IF NOT EXISTS test_location VARCHAR(255) DEFAULT '';
ALTER TABLE proxies ADD COLUMN IF NOT EXISTS test_latency_ms INT DEFAULT 0;
CREATE TABLE IF NOT EXISTS account_events (
id SERIAL PRIMARY KEY,
account_id INT NOT NULL DEFAULT 0,
event_type VARCHAR(20) NOT NULL,
source VARCHAR(30) DEFAULT '',
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_account_events_created ON account_events(created_at);
CREATE INDEX IF NOT EXISTS idx_account_events_type_created ON account_events(event_type, created_at);
CREATE TABLE IF NOT EXISTS image_prompt_templates (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT '',
prompt TEXT NOT NULL DEFAULT '',
model VARCHAR(100) DEFAULT '',
size VARCHAR(32) DEFAULT '',
quality VARCHAR(32) DEFAULT '',
output_format VARCHAR(32) DEFAULT '',
background VARCHAR(32) DEFAULT '',
style VARCHAR(64) DEFAULT '',
tags TEXT NOT NULL DEFAULT '[]',
favorite BOOLEAN DEFAULT FALSE,
usage_count INT DEFAULT 0,
last_used_at TIMESTAMPTZ NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_image_prompt_templates_updated ON image_prompt_templates(updated_at);
CREATE INDEX IF NOT EXISTS idx_image_prompt_templates_favorite ON image_prompt_templates(favorite, updated_at);
CREATE TABLE IF NOT EXISTS image_generation_jobs (
id SERIAL PRIMARY KEY,
status VARCHAR(32) NOT NULL DEFAULT 'queued',
prompt TEXT NOT NULL DEFAULT '',
params_json TEXT NOT NULL DEFAULT '{}',
api_key_id INT DEFAULT 0,
api_key_name VARCHAR(255) DEFAULT '',
api_key_masked VARCHAR(64) DEFAULT '',
error_message TEXT DEFAULT '',
duration_ms INT DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW(),
started_at TIMESTAMPTZ NULL,
completed_at TIMESTAMPTZ NULL
);
CREATE INDEX IF NOT EXISTS idx_image_generation_jobs_created ON image_generation_jobs(created_at);
CREATE INDEX IF NOT EXISTS idx_image_generation_jobs_status ON image_generation_jobs(status, created_at);
CREATE TABLE IF NOT EXISTS image_assets (
id SERIAL PRIMARY KEY,
job_id INT NOT NULL DEFAULT 0,
template_id INT DEFAULT 0,
filename VARCHAR(255) NOT NULL DEFAULT '',
storage_path TEXT NOT NULL DEFAULT '',
mime_type VARCHAR(100) NOT NULL DEFAULT '',
bytes INT DEFAULT 0,
width INT DEFAULT 0,
height INT DEFAULT 0,
model VARCHAR(100) DEFAULT '',
requested_size VARCHAR(32) DEFAULT '',
actual_size VARCHAR(32) DEFAULT '',
quality VARCHAR(32) DEFAULT '',
output_format VARCHAR(32) DEFAULT '',
revised_prompt TEXT DEFAULT '',
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_image_assets_created ON image_assets(created_at);
CREATE INDEX IF NOT EXISTS idx_image_assets_job_id ON image_assets(job_id);
`
_, err := db.conn.ExecContext(ctx, query)
if err != nil {
return err
}
// 独立长超时:将已有 TIMESTAMP 列迁移为 TIMESTAMPTZ(大表 ALTER COLUMN TYPE 可能较慢)
migrateQuery := `
DO $$
DECLARE
_tbl TEXT;
_col TEXT;
_rec RECORD;
BEGIN
FOR _rec IN
SELECT table_name, column_name
FROM information_schema.columns
WHERE table_schema = current_schema()
AND data_type = 'timestamp without time zone'
AND table_name IN ('accounts', 'usage_logs', 'api_keys', 'proxies', 'account_events')
LOOP
EXECUTE format(
'ALTER TABLE %I ALTER COLUMN %I TYPE TIMESTAMPTZ USING %I AT TIME ZONE current_setting(''TIMEZONE'')',
_rec.table_name, _rec.column_name, _rec.column_name
);
END LOOP;
END $$;
`
migrateCtx, migrateCancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer migrateCancel()
_, err = db.conn.ExecContext(migrateCtx, migrateQuery)
return err
}
// ==================== API Keys ====================
// APIKeyRow API 密钥行
type APIKeyRow struct {
ID int64 `json:"id"`
Name string `json:"name"`
Key string `json:"key"`
QuotaLimit float64 `json:"quota_limit"`
QuotaUsed float64 `json:"quota_used"`
ExpiresAt sql.NullTime `json:"expires_at"`
AllowedGroupIDs []int64 `json:"allowed_group_ids"`
CreatedAt time.Time `json:"created_at"`
}
type APIKeyInput struct {
Name string
Key string
QuotaLimit float64
QuotaUsed float64
ExpiresAt sql.NullTime
AllowedGroupIDs []int64
}
type APIKeyUpdate struct {
Name string
NameSet bool
QuotaLimit float64
QuotaLimitSet bool
ExpiresAt sql.NullTime
ExpiresAtSet bool
AllowedGroupIDs []int64
AllowedGroupIDsSet bool
}
const apiKeySelectColumns = `id, name, key, created_at, COALESCE(quota_limit, 0), COALESCE(quota_used, 0), expires_at, COALESCE(allowed_group_ids, '[]')`
// ListAPIKeys 获取所有 API 密钥
func (db *DB) ListAPIKeys(ctx context.Context) ([]*APIKeyRow, error) {
rows, err := db.conn.QueryContext(ctx, `SELECT `+apiKeySelectColumns+` FROM api_keys ORDER BY id`)
if err != nil {
return nil, err
}
defer rows.Close()
var keys []*APIKeyRow
for rows.Next() {
k, err := scanAPIKeyRow(rows)
if err != nil {
return nil, err
}
keys = append(keys, k)
}
return keys, rows.Err()
}
// CountAPIKeys 返回当前 API Key 数量。
func (db *DB) CountAPIKeys(ctx context.Context) (int, error) {
var count int
if err := db.conn.QueryRowContext(ctx, `SELECT COUNT(*) FROM api_keys`).Scan(&count); err != nil {
return 0, err
}
return count, nil
}
// GetAPIKeyByValue 通过完整 API Key 查找元数据,用于鉴权热路径的按 key 缓存。
func (db *DB) GetAPIKeyByValue(ctx context.Context, key string) (*APIKeyRow, error) {
rows, err := db.conn.QueryContext(ctx, `SELECT `+apiKeySelectColumns+` FROM api_keys WHERE key = $1`, key)
if err != nil {
return nil, err
}
defer rows.Close()
if !rows.Next() {
return nil, sql.ErrNoRows
}
return scanAPIKeyRow(rows)
}
// InsertAPIKey 插入新 API 密钥
func (db *DB) InsertAPIKey(ctx context.Context, name, key string) (int64, error) {
return db.InsertAPIKeyWithOptions(ctx, APIKeyInput{Name: name, Key: key})
}
func (db *DB) InsertAPIKeyWithOptions(ctx context.Context, input APIKeyInput) (int64, error) {
if input.QuotaLimit < 0 {
input.QuotaLimit = 0
}
if input.QuotaUsed < 0 {
input.QuotaUsed = 0
}
return db.insertRowID(ctx,
`INSERT INTO api_keys (name, key, quota_limit, quota_used, expires_at, allowed_group_ids) VALUES ($1, $2, $3, $4, $5, $6::jsonb) RETURNING id`,
`INSERT INTO api_keys (name, key, quota_limit, quota_used, expires_at, allowed_group_ids) VALUES ($1, $2, $3, $4, $5, $6)`,
input.Name, input.Key, input.QuotaLimit, input.QuotaUsed, nullableTimeArg(input.ExpiresAt), encodeInt64SliceJSON(input.AllowedGroupIDs),
)
}
func nullableTimeArg(value sql.NullTime) interface{} {
if !value.Valid {
return nil
}
return value.Time
}
func (row *APIKeyRow) IsExpired(now time.Time) bool {
return row != nil && row.ExpiresAt.Valid && !row.ExpiresAt.Time.After(now)
}
func (row *APIKeyRow) IsQuotaExhausted() bool {
return row != nil && row.QuotaLimit > 0 && row.QuotaUsed >= row.QuotaLimit
}
func (row *APIKeyRow) HasAccessConstraints() bool {
return row != nil && (row.QuotaLimit > 0 || row.ExpiresAt.Valid || len(row.AllowedGroupIDs) > 0)
}
// UpdateAPIKeyName updates the display name of an API key without changing the key value.
func (db *DB) UpdateAPIKeyName(ctx context.Context, id int64, name string) error {
res, err := db.conn.ExecContext(ctx, `UPDATE api_keys SET name = $1 WHERE id = $2`, name, id)
if err != nil {
return err
}
affected, err := res.RowsAffected()
if err != nil {
return err
}
if affected == 0 {
return sql.ErrNoRows
}
return nil
}
// UpdateAPIKeyQuotaLimit updates the quota ceiling. A non-positive value clears the limit.
func (db *DB) UpdateAPIKeyQuotaLimit(ctx context.Context, id int64, quotaLimit float64) error {
if quotaLimit < 0 {
quotaLimit = 0
}
res, err := db.conn.ExecContext(ctx, `UPDATE api_keys SET quota_limit = $1 WHERE id = $2`, quotaLimit, id)
if err != nil {
return err
}
affected, err := res.RowsAffected()
if err != nil {
return err
}
if affected == 0 {
return sql.ErrNoRows
}
return nil
}
// UpdateAPIKeyExpiresAt updates or clears the key expiration.
func (db *DB) UpdateAPIKeyExpiresAt(ctx context.Context, id int64, expiresAt sql.NullTime) error {
res, err := db.conn.ExecContext(ctx, `UPDATE api_keys SET expires_at = $1 WHERE id = $2`, nullableTimeArg(expiresAt), id)
if err != nil {
return err
}
affected, err := res.RowsAffected()
if err != nil {
return err
}
if affected == 0 {
return sql.ErrNoRows
}
return nil
}
// UpdateAPIKeyAllowedGroups persists the allowed-group scope for an API key.
// Empty slice clears the scope (key may schedule any account).
func (db *DB) UpdateAPIKeyAllowedGroups(ctx context.Context, id int64, groupIDs []int64) error {
payload := encodeInt64SliceJSON(groupIDs)
var (
res sql.Result
err error
)
if db.isSQLite() {
res, err = db.conn.ExecContext(ctx, `UPDATE api_keys SET allowed_group_ids = $1 WHERE id = $2`, payload, id)
} else {
res, err = db.conn.ExecContext(ctx, `UPDATE api_keys SET allowed_group_ids = $1::jsonb WHERE id = $2`, payload, id)
}
if err != nil {
return err
}
affected, err := res.RowsAffected()
if err != nil {
return err
}
if affected == 0 {
return sql.ErrNoRows
}
return nil
}
func (db *DB) UpdateAPIKeyAllowedGroupIDs(ctx context.Context, id int64, groupIDs []int64) error {
return db.UpdateAPIKeyAllowedGroups(ctx, id, groupIDs)
}