88using Microsoft . Extensions . Options ;
99using SocketIOClient ;
1010using System . Diagnostics ;
11+ using System . IO ;
1112using System . Net ;
1213using System . Runtime . InteropServices ;
1314using System . Security . Cryptography ;
@@ -29,10 +30,11 @@ public class Cluster : RunnableBase
2930 private HttpClient client ;
3031 public Guid guid ;
3132 private SocketIOClient . SocketIO socket ;
32- public bool IsEnabled { get ; private set ; }
33+ public bool IsEnabled { get ; set ; }
3334 private Task ? _keepAlive ;
3435 protected IStorage storage ;
3536 protected AccessCounter counter ;
37+ public CancellationTokenSource cancellationSrc = new CancellationTokenSource ( ) ;
3638 //List<Task> tasks = new List<Task>();
3739
3840 public Cluster ( ClusterInfo info , TokenManager token ) : base ( )
@@ -50,22 +52,28 @@ public Cluster(ClusterInfo info, TokenManager token) : base()
5052 this . storage = new FileStorage ( SharedData . Config . clusterFileDirectory ) ;
5153
5254 this . counter = new ( ) ;
55+ InitializeSocket ( ) ;
5356
57+ // 用来规避构造函数退出时巴拉巴拉的提示
58+ if ( this . socket == null )
59+ {
60+ throw new Exception ( "Impossible! \" socket\" field is still null." ) ;
61+ }
62+ }
63+
64+ protected void InitializeSocket ( )
65+ {
5466 this . socket = new ( HttpRequest . client . BaseAddress ? . ToString ( ) , new SocketIOOptions ( )
5567 {
5668 Transport = SocketIOClient . Transport . TransportProtocol . WebSocket ,
5769 Auth = new
5870 {
59- token = token . Token . token
71+ token = this . token . Token . token
6072 }
6173 } ) ;
62-
6374 }
6475
65- private void HandleError ( SocketIOResponse resp )
66- {
67- return ;
68- }
76+ public void HandleError ( SocketIOResponse resp ) => Utils . PrintResponseMessage ( resp ) ;
6977
7078 protected override int Run ( string [ ] args )
7179 {
@@ -95,15 +103,16 @@ protected async Task<int> AsyncRun()
95103
96104 SharedData . Logger . LogInfo ( $ "工作进程 { guid } 在 <{ SharedData . Config . HOST } :{ SharedData . Config . PORT } > 提供服务") ;
97105
98- _keepAlive = Task . Run ( ( ) =>
106+ _keepAlive = Task . Run ( async ( ) =>
99107 {
100108 while ( true )
101109 {
102- Thread . Sleep ( 25 * 1000 ) ;
110+ cancellationSrc . Token . ThrowIfCancellationRequested ( ) ;
111+ await Task . Delay ( 25 * 1000 ) ;
103112 // Disable().Wait();
104- KeepAlive ( ) . Wait ( ) ;
113+ await KeepAlive ( ) ;
105114 }
106- } ) ;
115+ } , cancellationSrc . Token ) ;
107116
108117 _keepAlive . Wait ( ) ;
109118
@@ -161,7 +170,7 @@ public void Connect()
161170 this . socket . ConnectAsync ( ) . Wait ( ) ;
162171
163172 this . socket . On ( "error" , error => HandleError ( error ) ) ;
164- this . socket . On ( "message" , msg => PrintServerMessage ( msg ) ) ;
173+ this . socket . On ( "message" , msg => Utils . PrintResponseMessage ( msg ) ) ;
165174 this . socket . On ( "connect" , ( _ ) => SharedData . Logger . LogInfo ( "与主控连接成功" ) ) ;
166175 this . socket . On ( "disconnect" , ( r ) =>
167176 {
@@ -170,19 +179,16 @@ public void Connect()
170179 } ) ;
171180 }
172181
173- private static void PrintServerMessage ( SocketIOResponse msg )
174- {
175- SharedData . Logger . LogInfo ( msg ) ;
176- }
177-
178182 public async Task Enable ( )
179183 {
180- await socket . EmitAsync ( "enable" ,
184+ if ( ! this . socket . Connected || ! this . IsEnabled )
185+ await socket . EmitAsync ( "enable" ,
181186 ( SocketIOResponse resp ) =>
182187 {
183- SharedData . Logger . LogInfo ( resp ) ;
188+ Utils . PrintResponseMessage ( resp ) ;
184189 // Debugger.Break();
185190 SharedData . Logger . LogInfo ( $ "启用成功") ;
191+ this . IsEnabled = true ;
186192 } ,
187193 new
188194 {
@@ -201,22 +207,25 @@ await socket.EmitAsync("enable",
201207
202208 public async Task Disable ( )
203209 {
204- await socket . EmitAsync ( "disable" ,
210+ if ( this . IsEnabled )
211+ await socket . EmitAsync ( "disable" ,
205212 ( SocketIOResponse resp ) =>
206213 {
207- SharedData . Logger . LogInfo ( resp ) ;
214+ Utils . PrintResponseMessage ( resp ) ;
208215 SharedData . Logger . LogInfo ( $ "禁用成功") ;
216+ this . IsEnabled = false ;
209217 } ) ;
210218 }
211219
212220 public async Task KeepAlive ( )
213221 {
222+ if ( ! this . IsEnabled ) return ;
214223 string time = DateTime . Now . ToStandardTimeString ( ) ;
215224 // socket.Connected.Dump();
216225 await socket . EmitAsync ( "keep-alive" ,
217226 ( SocketIOResponse resp ) =>
218227 {
219- SharedData . Logger . LogInfo ( resp ) ;
228+ Utils . PrintResponseMessage ( resp ) ;
220229 SharedData . Logger . LogInfo ( $ "保活成功 at { time } ,served { Utils . GetLength ( this . counter . bytes ) } ({ this . counter . bytes } bytes)/{ this . counter . hits } hits") ;
221230 this . counter . Reset ( ) ;
222231 } ,
@@ -234,27 +243,27 @@ public async Task GetConfiguration()
234243 var content = await resp . Content . ReadAsStringAsync ( ) ;
235244 }
236245
246+
237247 protected async Task CheckFiles ( )
238248 {
239- SharedData . Logger . LogInfo ( "开始检查文件" ) ;
240- var resp = await client . GetAsync ( "openbmclapi/files" ) ;
241- byte [ ] buffer = await resp . Content . ReadAsByteArrayAsync ( ) ;
242- var decompressor = new Decompressor ( ) ;
243- var data = decompressor . Unwrap ( buffer ) . ToArray ( ) ;
244- buffer = data ;
249+ const string avroString = @"{""type"": ""array"",""items"": {""type"": ""record"",""name"": ""fileinfo"",""fields"": [{""name"": ""path"", ""type"": ""string""},{""name"": ""hash"", ""type"": ""string""},{""name"": ""size"", ""type"": ""long""}]}}" ;
245250
246- string avroString = @"{""type"": ""array"",""items"": {""type"": ""record"",""name"": ""fileinfo"",""fields"": [{""name"": ""path"", ""type"": ""string""},{""name"": ""hash"", ""type"": ""string""},{""name"": ""size"", ""type"": ""long""}]}}" ;
251+ var resp = await this . client . GetAsync ( "openbmclapi/files" ) ;
252+ byte [ ] bytes = await resp . Content . ReadAsByteArrayAsync ( ) ;
253+ var decompressor = new Decompressor ( ) ;
254+ bytes = decompressor . Unwrap ( bytes ) . ToArray ( ) ;
247255
248256 Schema schema = Schema . Parse ( avroString ) ;
249-
250- Avro . IO . Decoder decoder = new BinaryDecoder ( new MemoryStream ( buffer ) ) ;
257+ var decoder = new BinaryDecoder ( new MemoryStream ( bytes ) ) ;
251258
252259 object [ ] files = new GenericDatumReader < object [ ] > ( schema , schema ) . Read ( null ! , decoder ) ;
253260
254261 object countLock = new ( ) ;
255262 int count = 0 ;
256263
257- Parallel . ForEach ( files , ( obj ) =>
264+ SharedData . Logger . LogDebug ( $ "文件检查策略:{ SharedData . Config . startupCheckMode } ") ;
265+
266+ Parallel . ForEach ( files , async ( obj ) =>
258267 //foreach (var obj in files)
259268 {
260269 GenericRecord ? record = obj as GenericRecord ;
@@ -270,44 +279,56 @@ protected async Task CheckFiles()
270279
271280 if ( long . TryParse ( t . ToString ( ) . ThrowIfNull ( ) , out size ) )
272281 {
273- DownloadFile ( path , hash ) . Wait ( ) ;
282+ await DownloadFile ( hash , path ) ;
274283 lock ( countLock )
275284 {
276285 count ++ ;
277286 }
287+ VerifyFile ( hash , size , SharedData . Config . startupCheckMode ) ;
278288 SharedData . Logger . LogInfoNoNewLine ( $ "\r { count } /{ files . Length } ") ;
279289 }
280290 }
281291 } ) ;
282- SharedData . Logger . LogInfo ( "\n 文件校验完毕" ) ;
283292 }
284293
285- private async Task DownloadFile ( string path , string hash , bool fullcheck = true )
294+ protected bool VerifyFile ( string hash , long size , FileVerificationMode mode )
295+ {
296+ string path = Path . Combine ( SharedData . Config . cacheDirectory , Utils . HashToFileName ( hash ) ) ;
297+
298+ switch ( mode )
299+ {
300+ case FileVerificationMode . None :
301+ return true ;
302+ case FileVerificationMode . Exists :
303+ return File . Exists ( path ) ;
304+ case FileVerificationMode . SizeOnly :
305+ if ( ! VerifyFile ( hash , size , FileVerificationMode . Exists ) ) return false ;
306+ FileInfo fileInfo = new FileInfo ( path ) ;
307+ return fileInfo . Length == size ;
308+ case FileVerificationMode . Hash :
309+ if ( ! VerifyFile ( hash , size , FileVerificationMode . SizeOnly ) ) return false ;
310+ var file = File . ReadAllBytes ( path ) ;
311+ return Utils . ValidateFile ( file , hash ) ;
312+ default :
313+ return true ;
314+ }
315+ }
316+
317+ private async Task DownloadFile ( string hash , string path )
286318 {
287319 string filePath = Path . Combine ( SharedData . Config . cacheDirectory , Utils . HashToFileName ( hash ) ) ;
288- if ( ! File . Exists ( filePath ) )
320+ if ( File . Exists ( filePath ) )
289321 {
290- var resp = await HttpRequest . client . GetAsync ( $ "openbmclapi/download/{ hash } ") ;
291- using ( var file = File . Create ( Path . Combine ( SharedData . Config . cacheDirectory , Utils . HashToFileName ( hash ) ) ) )
292- {
293- file . Write ( await resp . Content . ReadAsByteArrayAsync ( ) ) ;
294- }
295- SharedData . Logger . LogInfo ( $ "文件 { path } 下载完毕") ;
322+ return ;
296323 }
297- else
324+
325+ var resp = await this . client . GetAsync ( $ "openbmclapi/download/{ hash } ") ;
326+
327+ using ( var file = File . Create ( filePath ) )
298328 {
299- if ( fullcheck )
300- {
301- var file = File . ReadAllBytes ( filePath ) ;
302- bool valid = Utils . ValidateFile ( file , hash , out string realHash ) ;
303- if ( ! valid )
304- {
305- SharedData . Logger . LogInfo ( $ "文件 { path } 损坏(期望哈希值为 { hash } ,实际为 { realHash } )") ;
306- File . Delete ( filePath ) ;
307- await DownloadFile ( path , hash ) ;
308- }
309- }
329+ file . Write ( await resp . Content . ReadAsByteArrayAsync ( ) ) ;
310330 }
331+ SharedData . Logger . LogInfo ( $ "文件 { path } 下载成功") ;
311332 }
312333
313334 public async Task RequestCertification ( )
0 commit comments