3838import org .springframework .scheduling .annotation .Scheduled ;
3939import org .springframework .stereotype .Service ;
4040import org .springframework .transaction .annotation .Transactional ;
41+ import org .springframework .transaction .support .TransactionSynchronizationManager ;
4142import org .springframework .util .CollectionUtils ;
4243import org .springframework .util .StringUtils ;
4344
@@ -157,7 +158,7 @@ private void checkDataInputStatus(List<UtmDataInputStatus> inputs, String server
157158
158159 long currentTimeInSeconds = TimeUnit .MILLISECONDS .toSeconds (System .currentTimeMillis ());
159160 List <UtmDataInputStatus > inTime = inputs .stream ().filter (row -> (currentTimeInSeconds - row .getTimestamp ()) < 3600 )
160- .collect ( Collectors . toList () );
161+ .toList ();
161162 if (!CollectionUtils .isEmpty (inTime ))
162163 return ;
163164
@@ -219,7 +220,7 @@ public void syncDataInputStatus() {
219220 * Gets the sources from utm_data_input_status that are not registered in utm_network_scan table
220221 * and create new assets with it. This method is a schedule with a delay of 1 hour
221222 */
222- @ Scheduled (fixedDelay = 15000 , initialDelay = 30000 )
223+ @ Scheduled (fixedDelay = 30000 , initialDelay = 60000 )
223224 public void syncSourcesToAssets () {
224225 final String ctx = CLASSNAME + ".syncSourcesToAssets" ;
225226 try {
@@ -231,55 +232,73 @@ public void syncSourcesToAssets() {
231232 }
232233 }
233234
235+ @ Transactional
234236 public void synchronizeSourcesToAssets () {
235237 final String ctx = CLASSNAME + ".syncSourcesToAssets" ;
238+
236239 try {
237- final List <String > excludeOfTypes = dataTypesRepository .findAllByIncludedFalse ().stream ()
238- .map (UtmDataTypes ::getDataType ).collect (Collectors .toList ());
239- excludeOfTypes .addAll (Arrays .asList ("utmstack" , "UTMStack" , DataSourceConstants .IBM_AS400_TYPE ));
240-
241- List <UtmDataInputStatus > sources = dataInputStatusRepository .extractSourcesToExport (excludeOfTypes );
242- if (!CollectionUtils .isEmpty (sources )) {
243- //return;
244-
245- Map <String , Boolean > sourcesWithStatus = extractSourcesWithUpDownStatus (sources );
246- List <UtmNetworkScan > assets = networkScanService .findAll ();
247-
248- List <UtmNetworkScan > saveOrUpdate = new ArrayList <>();
249- sourcesWithStatus .forEach ((key , value ) -> {
250- Optional <UtmNetworkScan > assetOpt = assets .stream ()
251- .filter (asset -> ((StringUtils .hasText (asset .getAssetIp ()) && asset .getAssetIp ().equals (key ))
252- || (StringUtils .hasText (asset .getAssetName ()) && asset .getAssetName ().equals (key ))))
253- .findFirst ();
254- if (assetOpt .isPresent ()) {
255- UtmNetworkScan utmAsset = assetOpt .get ();
256- if (Objects .isNull (utmAsset .getUpdateLevel ())
257- || utmAsset .getUpdateLevel ().equals (UpdateLevel .DATASOURCE )) {
258- utmAsset .assetAlive (value )
259- .updateLevel (UpdateLevel .DATASOURCE )
260- .assetStatus (AssetStatus .CHECK )
261- .modifiedAt (LocalDateTime .now ().toInstant (ZoneOffset .UTC ));
262- saveOrUpdate .add (utmAsset );
263- }
264- } else {
265- saveOrUpdate .add (new UtmNetworkScan (key , value ));
266- }
267- });
268-
269- assets .forEach (asset -> {
270- if (!sourcesWithStatus .containsKey (asset .getAssetIp ()) && !sourcesWithStatus .containsKey (asset .getAssetName ())
271- && !Objects .isNull (asset .getUpdateLevel ()) && asset .getUpdateLevel ().equals (UpdateLevel .DATASOURCE )) {
272- asset .assetStatus (AssetStatus .MISSING ).updateLevel (null )
273- .modifiedAt (LocalDateTime .now ().toInstant (ZoneOffset .UTC ));
274- saveOrUpdate .add (asset );
275- }
276- });
277240
278- networkScanService .saveAll (saveOrUpdate );
241+ List <String > excludeDataTypes = dataTypesRepository .findAllByIncludedFalse ()
242+ .stream ()
243+ .map (UtmDataTypes ::getDataType )
244+ .collect (Collectors .toList ());
245+
246+ excludeDataTypes .addAll (Arrays .asList ("utmstack" , "UTMStack" , DataSourceConstants .IBM_AS400_TYPE ));
247+
248+ List <UtmDataInputStatus > sources = dataInputStatusRepository .extractSourcesToExport (excludeDataTypes );
249+ if (CollectionUtils .isEmpty (sources )) {
250+ return ;
279251 }
280- // Finally, delete excluded assets
281- networkScanRepository .deleteAllAssetsByDataType (excludeOfTypes );
252+
253+ Map <String , Boolean > sourcesWithStatus = extractSourcesWithUpDownStatus (sources );
254+
255+ List <String > keys = new ArrayList <>(sourcesWithStatus .keySet ());
256+ List <UtmNetworkScan > assets = networkScanRepository .findByAssetIpInOrAssetNameIn (keys , keys );
257+
258+ Map <String , UtmNetworkScan > assetsByKey = new HashMap <>();
259+
260+ assets .forEach (a -> {
261+ if (StringUtils .hasText (a .getAssetIp ())) assetsByKey .put (a .getAssetIp (), a );
262+ if (StringUtils .hasText (a .getAssetName ())) assetsByKey .put (a .getAssetName (), a );
263+ });
264+
265+ for (Map .Entry <String , Boolean > entry : sourcesWithStatus .entrySet ()) {
266+ String key = entry .getKey ();
267+ Boolean alive = entry .getValue ();
268+
269+ UtmNetworkScan asset = assetsByKey .get (key );
270+
271+ if (asset != null ) {
272+ if (asset .getUpdateLevel () == null || asset .getUpdateLevel ().equals (UpdateLevel .DATASOURCE ) || asset .getUpdateLevel ().equals (UpdateLevel .AGENT )) {
273+ asset .assetAlive (alive )
274+ .updateLevel (UpdateLevel .DATASOURCE )
275+ .assetStatus (AssetStatus .CHECK )
276+ .modifiedAt (LocalDateTime .now ().toInstant (ZoneOffset .UTC ));
277+
278+ networkScanService .save (asset );
279+ }
280+ } else {
281+ networkScanService .save (new UtmNetworkScan (key , alive ));
282+ }
283+ }
284+
285+ assets .forEach (asset -> {
286+ boolean missing = !sourcesWithStatus .containsKey (asset .getAssetIp ())
287+ && !sourcesWithStatus .containsKey (asset .getAssetName ());
288+
289+ if (missing && UpdateLevel .DATASOURCE .equals (asset .getUpdateLevel ())) {
290+ asset .assetStatus (AssetStatus .MISSING )
291+ .updateLevel (null )
292+ .modifiedAt (LocalDateTime .now ().toInstant (ZoneOffset .UTC ));
293+
294+ networkScanService .save (asset );
295+ }
296+ });
297+
298+ networkScanRepository .deleteAllAssetsByDataType (excludeDataTypes );
299+
282300 } catch (Exception e ) {
301+ log .error ("{}: Error synchronizing sources to assets - {}" , ctx , e .getMessage (), e );
283302 throw new RuntimeException (ctx + ": " + e .getLocalizedMessage ());
284303 }
285304 }
0 commit comments