1+ using System ;
2+ using System . Collections . Generic ;
3+ using System . IO ;
4+ using System . Linq ;
5+ using System . Threading . Tasks ;
6+ using Seq . Api ;
7+ using SeqCli . Config ;
8+ using SeqCli . Forwarder . Filesystem . System ;
9+ using Serilog ;
10+
11+ namespace SeqCli . Forwarder . Channel ;
12+
13+ class ApiKeyForwardingChannelWrapper : ForwardingChannelWrapper
14+ {
15+ readonly Dictionary < string , ForwardingChannel > _channelsByApiKey = new ( ) ;
16+ const string EmptyApiKeyChannelId = "EmptyApiKey" ;
17+
18+ public ApiKeyForwardingChannelWrapper ( string bufferPath , SeqConnection connection , SeqCliConfig config ) : base ( bufferPath , connection , config )
19+ {
20+ LoadChannels ( ) ;
21+ }
22+
23+ // Start forwarding channels found on the file system.
24+ void LoadChannels ( )
25+ {
26+ foreach ( var directoryPath in Directory . EnumerateDirectories ( BufferPath ) )
27+ {
28+ if ( directoryPath . Equals ( GetStorePath ( SeqCliConnectionChannelId ) ) )
29+ {
30+ // data was stored when not using API key forwarding
31+ continue ;
32+ }
33+
34+ string apiKey , channelId ;
35+
36+ if ( new SystemStoreDirectory ( directoryPath ) . TryReadApiKey ( Config , out var key ) )
37+ {
38+ apiKey = key ! ;
39+ channelId = directoryPath ;
40+ }
41+ else
42+ {
43+ // directory should contain an api key file but does not
44+ continue ;
45+ }
46+
47+ var created = OpenOrCreateChannel ( channelId , apiKey ) ;
48+ _channelsByApiKey . Add ( apiKey , created ) ;
49+ }
50+ }
51+
52+ public override ForwardingChannel GetForwardingChannel ( string ? requestApiKey )
53+ {
54+ lock ( ChannelsSync )
55+ {
56+ // use empty string to represent no api key
57+ if ( _channelsByApiKey . TryGetValue ( requestApiKey ?? "" , out var channel ) )
58+ {
59+ return channel ;
60+ }
61+
62+ var channelId = ApiKeyToId ( requestApiKey ) ;
63+ var created = OpenOrCreateChannel ( channelId , requestApiKey ) ;
64+ var store = new SystemStoreDirectory ( GetStorePath ( channelId ) ) ;
65+ store . WriteApiKey ( Config , requestApiKey ?? "" ) ;
66+ _channelsByApiKey . Add ( requestApiKey ?? "" , created ) ;
67+ return created ;
68+ }
69+ }
70+
71+ string ApiKeyToId ( string ? apiKey )
72+ {
73+ return string . IsNullOrEmpty ( apiKey ) ? EmptyApiKeyChannelId : Guid . NewGuid ( ) . ToString ( ) ;
74+ }
75+
76+ public override async Task StopAsync ( )
77+ {
78+ Log . ForContext < ApiKeyForwardingChannelWrapper > ( ) . Information ( "Flushing log buffers" ) ;
79+ ShutdownTokenSource . CancelAfter ( TimeSpan . FromSeconds ( 30 ) ) ;
80+
81+ Task [ ] stopChannels ;
82+ lock ( ChannelsSync )
83+ {
84+ stopChannels = _channelsByApiKey . Values . Select ( ch => ch . StopAsync ( ) ) . ToArray ( ) ;
85+ }
86+
87+ await Task . WhenAll ( [ ..stopChannels ] ) ;
88+ await ShutdownTokenSource . CancelAsync ( ) ;
89+ }
90+ }
0 commit comments