55"""
66
77import json
8+ from pathlib import Path
9+ import time
810
911import numpy as np
1012import pyTEMlib .image_tools as it
1113import pyTEMlib .probe_tools as pt
1214import tango
15+ from ase .io import read
1316from ase import Atoms
1417from ase .build import bulk
15- from tango import AttrWriteType , DevState
16- from tango .server import Device , attribute , device_property
18+ from tango import AttrWriteType , DevState , DevEncoded
19+ from tango .server import Device , attribute , device_property , command
1720
1821from asyncroscopy .Microscope import Microscope
22+ from asyncroscopy .simulation import StemSim as dg # dg means datagenerator --> needs better naming -> using directly as used in twisted version for now
23+
24+
25+ HERE = Path (__file__ ).resolve ().parent
26+ PROJECT_ROOT = HERE .parent # removes "servers"
1927
2028
2129class ThermoDigitalTwin (Microscope ):
2230 """
2331 Persistent ASE-backed sample simulation with stage-coupled viewport rendering.
2432 """
2533
34+ # ------------------------------------------------------------------
35+ # Device properties — configure in Tango DB per deployment
36+ # ------------------------------------------------------------------
37+
2638 sample_seed = device_property (
2739 dtype = int ,
2840 default_value = 12345 ,
@@ -109,6 +121,7 @@ def _connect_detector_proxies(self) -> None:
109121 "eds" : self .eds_device_address ,
110122 "stage" : self .stage_device_address ,
111123 "scan" : self .scan_device_address ,
124+ "corrector" : self .corrector_device_address
112125 }
113126 for name , address in addresses .items ():
114127 if not address :
@@ -178,7 +191,9 @@ def _sub_pix_gaussian(size: int = 11, sigma: float = 0.8, dx: float = 0.0, dy: f
178191 g = np .exp (- (((xx + dx ) ** 2 + (yy + dy ) ** 2 ) / (2 * sigma ** 2 )))
179192 m = np .max (g )
180193 return g / m if m > 0 else g
181-
194+ # ------------------------------------------------------------------
195+ # simulation helpers ----> Should be put in asyncroscopy/simulation later
196+ # ------------------------------------------------------------------
182197 def _create_pseudo_potential (
183198 self ,
184199 xtal : Atoms ,
@@ -397,6 +412,9 @@ def _generate_sample(self, seed: int) -> None:
397412 self ._cached_pose_key = None
398413 self ._update_view_cache (force = True )
399414
415+ # ------------------------------------------------------------------
416+ # Attribute read methods
417+ # ------------------------------------------------------------------
400418 def read_manufacturer (self ) -> str :
401419 """Read method for the manufacturer attribute."""
402420 return self ._manufacturer
@@ -413,6 +431,57 @@ def write_beam_pos(self, value):
413431 self ._beam_pos_x = float (x )
414432 self ._beam_pos_y = float (y )
415433
434+
435+ # ------------------------------------------------------------------
436+ # Commands
437+ # ------------------------------------------------------------------
438+
439+ @command (dtype_out = DevEncoded )#In PyTango, DevEncoded is a special Tango data type designed to send binary data + a small description string together as a single return value.
440+ def get_scanned_image_with_aberrations (self ) -> tuple [str , bytes ]:
441+ """
442+ Acquire a single STEM image from the named detector.
443+
444+ Parameters
445+ ----------
446+ detector_name:
447+ Name of the detector, e.g. "haadf". Must match a key in
448+ self._detector_proxies.
449+
450+ Returns
451+ -------
452+ DevEncoded = (json_metadata, raw_bytes)
453+ json_metadata includes: shape, dtype, dwell_time, detector,
454+ timestamp, and any other relevant metadata.
455+ raw_bytes is the flat numpy array bytes; reshape using shape from metadata.
456+ """
457+ # check active detectors
458+ scan = self ._detector_proxies .get ("scan" )
459+ corrector = self ._detector_proxies .get ("corrector" )
460+
461+
462+ # Read scan settings from the detector device
463+ dwell_time = scan .dwell_time
464+ imsize = scan .imsize
465+
466+ # Read aberration setting from the corrector
467+ ab = corrector .get_aberrations_coeff_sim ()# is a json string
468+ self .ab = json .loads (ab )
469+
470+ adorned_image = self ._acquire_stem_image_aberrations (imsize , dwell_time , ['haadf' ])
471+
472+ metadata = {
473+ "detector" : 'haadf' ,
474+ "shape" : [imsize , imsize ],
475+ "dtype" : str (adorned_image .dtype ),
476+ "dwell_time" : dwell_time ,
477+ "timestamp" : time .time (),
478+ # TODO: add metadata from adorned_image.metadata when using real AutoScript
479+ }
480+
481+ return json .dumps (metadata ), adorned_image .tobytes ()
482+ # ------------------------------------------------------------------
483+ # Internal acquisition helpers
484+ # ------------------------------------------------------------------
416485 def _acquire_stem_image (self , imsize : int , dwell_time : float , detector_list : list ) -> np .ndarray :
417486 """Simulate STEM image acquisition using convolutions of the pseudo-potential and electron probe."""
418487 self ._sync_stage_from_proxy ()
@@ -457,6 +526,46 @@ def _acquire_stem_image(self, imsize: int, dwell_time: float, detector_list: lis
457526 noisy_image += self ._lowfreq_noise (noisy_image , noise_level = 0.1 , freq_scale = 0.1 , rng = rng ) * blur_noise_level
458527 return np .clip (noisy_image , 0.0 , 1.0 ).astype (np .float32 )
459528
529+
530+ def _acquire_stem_image_aberrations (self , imsize : int , dwell_time : float , detector_list : list ) -> np .ndarray :
531+ """
532+ """
533+ size = imsize
534+ beam_current = 100 # pA
535+ ab = self .ab
536+ ab ['acceleration_voltage' ] = 60e3 # eV
537+ fov = 96 # angstroms
538+ ab ['FOV' ] = fov / 12 # Angstroms
539+ ab ['convergence_angle' ] = 30 # mrad
540+ ab ['wavelength' ] = it .get_wavelength (ab ['acceleration_voltage' ])
541+ cif_path = (
542+ PROJECT_ROOT
543+ / "data"
544+ / "cif_files"
545+ / "WS2_ortho.cif"
546+ )
547+ print ("Reading CIF from:" , cif_path )
548+ xtal = read (cif_path )
549+ xtal = xtal * (30 , 20 , 1 )
550+ positions = xtal .get_positions ()[:, :2 ]
551+ pixel_size = 0.106 # angstrom/pixel
552+ frame = (0 ,fov ,0 ,fov ) # limits of the image in angstroms
553+ potential = dg .create_pseudo_potential (xtal , pixel_size , sigma = 1 , bounds = frame , atom_frame = 11 )
554+ probe = dg .get_probe (ab , potential )
555+ image = dg .convolve_kernel (potential , probe )
556+ noisy_image = dg .lowfreq_noise (image , noise_level = 0.5 , freq_scale = .04 )
557+
558+ scan_time = dwell_time * size * size
559+ counts = scan_time * (beam_current * 1e-12 ) / (1.602e-19 )
560+ sim_im = dg .poisson_noise (noisy_image , counts = counts )
561+ # convert args dict
562+
563+ # time.sleep(1)
564+ image = np .array (image , dtype = np .float32 )
565+ sim_im = np .array (sim_im , dtype = np .float32 )
566+ print (sim_im .shape ) # TODO: the simulation is independent of size and always returns - (905, 905) image -> need to check
567+ return sim_im
568+
460569 def _acquire_stem_image_advanced (
461570 self ,
462571 detector_names : list [str ],
@@ -588,4 +697,4 @@ def get_viewport_metadata(self) -> str:
588697 return json .dumps (metadata )
589698
590699if __name__ == "__main__" :
591- ThermoDigitalTwin .run_server ()
700+ ThermoDigitalTwin .run_server ()
0 commit comments