33import java .io .IOException ;
44import java .util .List ;
55import java .util .Map ;
6- import java .util .concurrent .ExecutorService ;
7- import java .util .concurrent .Executors ;
8- import java .util .concurrent .LinkedBlockingQueue ;
96import org .apache .commons .cli .CommandLine ;
10- import org .apache .commons .lang3 .StringUtils ;
117import org .slf4j .Logger ;
128import org .slf4j .LoggerFactory ;
9+ import com .dtstack .logstash .assembly .pthread .FilterThread ;
10+ import com .dtstack .logstash .assembly .pthread .InputThread ;
11+ import com .dtstack .logstash .assembly .pthread .OutputThread ;
12+ import com .dtstack .logstash .assembly .qlist .InputQueueList ;
13+ import com .dtstack .logstash .assembly .qlist .OutPutQueueList ;
1314import com .dtstack .logstash .configs .YamlConfig ;
14- import com .dtstack .logstash .factory . FilterFactory ;
15+ import com .dtstack .logstash .exception . ExceptionUtil ;
1516import com .dtstack .logstash .factory .InputFactory ;
16- import com .dtstack .logstash .factory .OutputFactory ;
17- import com .dtstack .logstash .filters .BaseFilter ;
1817import com .dtstack .logstash .inputs .BaseInput ;
1918import com .dtstack .logstash .outputs .BaseOutput ;
20- import com .dtstack .logstash .property .SystemProperty ;
21- import com .dtstack .logstash .utils .Machine ;
2219import com .google .common .collect .Lists ;
2320
2421/**
3229public class AssemblyPipeline {
3330
3431 private static Logger logger = LoggerFactory .getLogger (AssemblyPipeline .class );
32+
33+ private InputQueueList initInputQueueList ;
3534
36- private static ExecutorService filterOutputExecutor =null ;
37-
38- private static ExecutorService inputExecutor =null ;
39-
40- private InputQueueList initInputQueueList =null ;
41-
42- private List <BaseInput > baseInputs =null ;
35+ private OutPutQueueList initOutputQueueList ;
36+
37+ private List <BaseInput > baseInputs ;
4338
4439 private List <BaseOutput > allBaseOutputs = Lists .newCopyOnWriteArrayList ();
4540
41+
4642 /**
4743 * 组装管道
4844 * @param cmdLine
4945 * @return
5046 * @throws IOException
5147 */
5248 @ SuppressWarnings ({ "unchecked" , "rawtypes" })
53- public InputQueueList assemblyPipeline (CommandLine cmdLine ) throws IOException {
49+ public void assemblyPipeline (CommandLine cmdLine ) throws IOException {
5450 try {
5551 logger .debug ("load config start ..." );
5652 Map configs = new YamlConfig ().parse (cmdLine .getOptionValue ("f" ));
5753 logger .debug (configs .toString ());
5854 logger .debug ("initInputQueueList start ..." );
59- initInputQueueList =initInputQueueList (cmdLine );
55+ initInputQueueList =InputQueueList .getInputQueueListInstance (CmdLineParams .getFilterWork (cmdLine ), CmdLineParams .getInputQueueSize (cmdLine ));
56+ if (initInputQueueList ==null ||initInputQueueList .getQueueList ().size ()==0 ){
57+ logger .error ("init inputQueueList is error" );
58+ System .exit (1 );
59+ }
6060 List <Map > inputs = (List <Map >) configs .get ("inputs" );
6161 if (inputs ==null ||inputs .size ()==0 ){
6262 logger .error ("input plugin is not empty" );
6363 System .exit (1 );
6464 }
65-
66- List <Map > outputs = (List <Map >) configs .get ("outputs" );
65+ logger .debug ("initOutputQueueList start ..." );
66+ initOutputQueueList = OutPutQueueList .getOutPutQueueListInstance (CmdLineParams .getOutputWork (cmdLine ), CmdLineParams .getOutputQueueSize (cmdLine ));
67+ if (initOutputQueueList ==null ||initOutputQueueList .getQueueList ().size ()==0 ){
68+ logger .error ("init outputQueueList is error" );
69+ System .exit (1 );
70+ }
71+ List <Map > outputs = (List <Map >) configs .get ("outputs" );
6772 if (outputs ==null ||outputs .size ()==0 ){
6873 logger .error ("output plugin is not empty" );
6974 System .exit (1 );
7075 }
7176 List <Map > filters = (List <Map >) configs .get ("filters" );
7277 logger .debug ("init input plugin start ..." );
73- baseInputs =InputFactory .getBatchInstance (inputs , initInputQueueList );
78+ baseInputs =InputFactory .getBatchInstance (inputs ,initInputQueueList );
7479 initInputQueueList .startElectionIdleQueue ();
75- if (isInputQueueSizeLog (cmdLine ))initInputQueueList .startLogQueueSize ();
80+ initOutputQueueList .startElectionIdleQueue ();
81+ if (CmdLineParams .isQueueSizeLog (cmdLine )){
82+ initInputQueueList .startLogQueueSize ();
83+ initOutputQueueList .startLogQueueSize ();
84+ }
7685 logger .debug ("input thread start ..." );
77- initInputPutThread (baseInputs );
78- logger .debug ("FilterAndOutput thread start ..." );
79- initFilterAndOutputThread (outputs ,filters ,initInputQueueList .getQueueList (),getOutBatchSize (cmdLine ));
86+ InputThread .initInputThread (baseInputs );
87+ logger .debug ("filter thread start ..." );
88+ FilterThread .initFilterThread (filters ,initInputQueueList ,initOutputQueueList );
89+ logger .debug ("output thread start ..." );
90+ OutputThread .initOutPutThread (outputs ,initOutputQueueList ,allBaseOutputs );
91+ //add shutdownhook
92+ ShutDownHook shutDownHook = new ShutDownHook (initInputQueueList ,initOutputQueueList ,baseInputs ,allBaseOutputs );
93+ shutDownHook .addShutDownHook ();
8094 }catch (Exception t ){
81- logger .error ("assemblyPipeline is error:{}" , t . getCause ( ));
95+ logger .error ("assemblyPipeline is error:{}" ,ExceptionUtil . getErrorMessage ( t ));
8296 System .exit (1 );
8397 }
84- return initInputQueueList ;
8598 }
86-
87-
88- protected InputQueueList initInputQueueList (CommandLine cmdLine ){
89- int filterWorks = getFilterWork (cmdLine );
90- int queueSize = getInputQueueSize (cmdLine );
91- InputQueueList queueList = new InputQueueList ();
92- List <LinkedBlockingQueue <Map <String ,Object >>> list =queueList .getQueueList ();
93- for (int i =0 ;i <filterWorks ;i ++){
94- list .add (new LinkedBlockingQueue <Map <String ,Object >>(queueSize ));
95- }
96- return queueList ;
97- }
98-
99-
100- protected void initInputPutThread (List <BaseInput > baseInputs ) {
101- // TODO Auto-generated method stub
102- inputExecutor = Executors .newFixedThreadPool (baseInputs .size ());
103- for (BaseInput input :baseInputs ){
104- inputExecutor .submit (new InputThread (input ));
105- }
106- }
107-
108- @ SuppressWarnings ("rawtypes" )
109- protected void initFilterAndOutputThread (List <Map > outputs , List <Map > filters , List <LinkedBlockingQueue <Map <String ,Object >>> queues ,int batchSize ) throws Exception {
110- filterOutputExecutor = Executors .newFixedThreadPool (queues .size ());
111- for (LinkedBlockingQueue <Map <String ,Object >> queue :queues ){
112- List <BaseOutput > baseOutputs = OutputFactory .getBatchInstance (outputs );
113- List <BaseFilter > baseFilters = FilterFactory .getBatchInstance (filters );
114- filterOutputExecutor .submit (new FilterAndOutputThread (queue ,baseFilters ,baseOutputs ,batchSize ));
115- allBaseOutputs .addAll (baseOutputs );
116- }
117- }
118-
119- /**
120- * 获取filter线程数
121- * @param line
122- * @return
123- */
124- protected static int getFilterWork (CommandLine line ){
125- String number =line .getOptionValue ("w" );
126- int works =StringUtils .isNotBlank (number )?Integer .parseInt (number ):Machine .availableProcessors ();
127- logger .warn ("getFilterWork--->" +works );
128- return works ;
129- }
130-
131- /**
132- *获取queue size的大小
133- * @param line
134- * @return
135- */
136- protected static int getInputQueueSize (CommandLine line ){
137- String number =line .getOptionValue ("q" );
138- return StringUtils .isNotBlank (number )?Integer .parseInt (number ):Integer .parseInt (SystemProperty .getSystemProperty ("inputQueueSize" ));
139- }
140-
141- /**
142- * 获取batch size的大小
143- * @param line
144- * @return
145- */
146- protected static int getOutBatchSize (CommandLine line ){
147- String number =line .getOptionValue ("b" );
148- return StringUtils .isNotBlank (number )?Integer .parseInt (number ):Integer .parseInt (SystemProperty .getSystemProperty ("batchSize" ));
149- }
150-
151-
152- /**
153- * 是否开启InputQueueSize log日志标准输出
154- * @param line
155- * @return
156- */
157- protected static boolean isInputQueueSizeLog (CommandLine line ){
158- return line .hasOption ("t" );
159- }
160-
161-
162- public List <BaseInput > getBaseInputs () {
163- return this .baseInputs ;
164- }
165-
166-
167- public List <BaseOutput > getAllBaseOutputs () {
168- return allBaseOutputs ;
169- }
170-
171- }
99+ }
0 commit comments