diff --git a/pysdkit/_itd/itd.py b/pysdkit/_itd/itd.py index 8df9533..6bb6244 100644 --- a/pysdkit/_itd/itd.py +++ b/pysdkit/_itd/itd.py @@ -3,7 +3,6 @@ Created on 2025/01/12 12:24:54 @author: Whenxuan Wang @email: wwhenxuan@gmail.com -not done!!! """ import numpy as np @@ -35,7 +34,13 @@ def __str__(self) -> str: return "Intrinsic Time-Scale Decomposition (ITD)" def stop_iter(self, x, counter, E_x): - """Stop the main iteration loop""" + """Determine whether the ITD iteration should stop. + + :param x: current residual signal + :param counter: number of iterations completed + :param E_x: energy of the original input signal + :return: True if decomposition should stop + """ if counter > self.N_max: return True @@ -43,22 +48,31 @@ def stop_iter(self, x, counter, E_x): if Exx <= 0.01 * E_x: return True - # 获取输入信号的峰值 - pks1, _ = find_peaks(x) # 极大值位置 - pks2, _ = find_peaks(-x) # 极小值位置 - pks = np.union1d(pks1, pks2) # 所有的极值点 + pks1, _ = find_peaks(x) + pks2, _ = find_peaks(-x) + pks = np.union1d(pks1, pks2) if len(pks) <= 7: return True - # EOF return False @staticmethod def itd_baseline_extract(x): - """This function is to calculate the baseline L of the signal x.""" + """ + Calculate the baseline L of the signal x using the ITD algorithm. + + The baseline is constructed by computing control points (LK) at each + extremum as a weighted average (alpha=0.5) of the signal value and the + interpolated opposite envelope, then piecewise-linearly interpolating + between consecutive extrema using signal-value-based slopes. + + :param x: input 1D signal array + :return: tuple (L, H) where L is the baseline and H = x - L is the + proper rotation component + """ length = len(x) - t = np.arange(0, length) + t = np.arange(length) alpha = 0.5 idx_max, _ = find_peaks(x) @@ -67,28 +81,28 @@ def itd_baseline_extract(x): idx_min, _ = find_peaks(-x) val_min = x[idx_min] + if len(idx_max) == 0 or len(idx_min) == 0: + return x.copy(), np.zeros_like(x) + idx_cb = np.union1d(idx_max, idx_min) - # Check the boundary conditions - - # Left side - if np.min(idx_max) < np.min(idx_min): - idx_min = np.append(idx_max[0], idx_min) - val_min = np.append(val_min[0], val_min) - elif np.min(idx_max) > np.min(idx_min): - idx_max = np.append(idx_min[0], idx_max) - val_max = np.append(val_max[0], val_max) - - # Right side - if np.max(idx_max) > np.max(idx_min): - idx_min = np.append(idx_min, idx_max[-1]) - val_min = np.append(val_min, val_min[-1]) - elif np.max(idx_max) < np.max(idx_min): - idx_max = np.append(idx_max, idx_min[-1]) - val_max = np.append(val_max, val_max[-1]) - - # Compute the LK points - # 创建插值函数对象,使用线性插值 + # Pad boundary so that idx_max and idx_min span the same index range. + # When the first (or last) extremum on one side has no counterpart, + # mirror the opposite extremum's position with the signal value there. + if idx_max[0] < idx_min[0]: + idx_min = np.concatenate(([idx_max[0]], idx_min)) + val_min = np.concatenate(([x[idx_max[0]]], val_min)) + elif idx_min[0] < idx_max[0]: + idx_max = np.concatenate(([idx_min[0]], idx_max)) + val_max = np.concatenate(([x[idx_min[0]]], val_max)) + + if idx_max[-1] > idx_min[-1]: + idx_min = np.concatenate((idx_min, [idx_max[-1]])) + val_min = np.concatenate((val_min, [x[idx_max[-1]]])) + elif idx_min[-1] > idx_max[-1]: + idx_max = np.concatenate((idx_max, [idx_min[-1]])) + val_max = np.concatenate((val_max, [x[idx_min[-1]]])) + Max_line_interp = interp1d( idx_max, val_max, kind="linear", fill_value="extrapolate" ) @@ -96,39 +110,46 @@ def itd_baseline_extract(x): idx_min, val_min, kind="linear", fill_value="extrapolate" ) - # 计算插值点的值 Max_line = Max_line_interp(t) Min_line = Min_line_interp(t) - LK1 = alpha * Max_line[idx_min] + val_min * (1 - alpha) - LK2 = alpha * Min_line[idx_max] + val_max * (1 - alpha) + # LK control points: weighted average at each extremum + LK1_vals = alpha * Max_line[idx_min] + (1 - alpha) * val_min + LK2_vals = alpha * Min_line[idx_max] + (1 - alpha) * val_max - LK1 = np.append(idx_min.reshape(-1, 1), LK1.reshape(-1, 1)) - LK2 = np.append(idx_max.reshape(-1, 1), LK2.reshape(-1, 1)) + # Build (position, value) pairs and merge + LK1_pts = np.column_stack((idx_min, LK1_vals)) + LK2_pts = np.column_stack((idx_max, LK2_vals)) + LK = np.vstack((LK1_pts, LK2_pts)) - print(LK1.shape, LK2.shape) + # Sort by position and remove duplicates at the boundaries + order = np.argsort(LK[:, 0]) + LK = LK[order] - LK = np.vstack((LK1, LK2)).T + # Remove the padded boundary duplicates (first and last are mirrors) + if len(LK) > 2: + LK = LK[1:-1] - LK_col_2 = np.argsort(LK[:, 0]) - LK_sorted = LK[LK_col_2, :] - LK = LK_sorted[1:-1, :] + # Clamp to signal boundaries + LK = np.vstack(( + [0, LK[0, 1]], + LK, + [length - 1, LK[-1, 1]], + )) - LK = np.vstack((np.array([0, LK[0, 1]]), LK, np.array([length - 1, LK[-1, 1]]))) - - # Compute the Lt points - idx_Xk = np.hstack((0, idx_cb, length - 1)).reshape([len(idx_cb) + 2]) + # Compute baseline by piecewise-linear interpolation between extrema + idx_Xk = np.concatenate(([0], idx_cb, [length - 1])).astype(int) L = np.zeros(length) - for i in range(0, len(idx_Xk) - 1): + for i in range(len(idx_Xk) - 1): + denom = x[idx_Xk[i + 1]] - x[idx_Xk[i]] + kij = (LK[i + 1, 1] - LK[i, 1]) / denom if denom != 0 else 0.0 for j in range(idx_Xk[i], idx_Xk[i + 1]): - kij = (LK[i + 1, 1] - LK[i, 1]) / ( - x[idx_Xk[i + 1]] - x[idx_Xk[i]] - ) # compute the slope K L[j] = LK[i, 1] + kij * (x[j] - x[idx_Xk[i]]) + L[length - 1] = LK[-1, 1] + H = x - L - print(H) return L, H def fit_transform(self, signal: np.ndarray) -> np.ndarray: diff --git a/pysdkit/tests/test_itd.py b/pysdkit/tests/test_itd.py new file mode 100644 index 0000000..d21e15c --- /dev/null +++ b/pysdkit/tests/test_itd.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +""" +Tests for the Intrinsic Time-Scale Decomposition (ITD) algorithm. +""" +import unittest +import numpy as np + +from pysdkit import ITD +from pysdkit.data import test_emd, test_univariate_signal + + +class ITDTest(unittest.TestCase): + """Test that ITD runs correctly on various signal types.""" + + def test_fit_transform(self) -> None: + """Verify signal decomposition on built-in test signals.""" + itd = ITD(N_max=5) + for index in range(1, 4): + time, signal = test_univariate_signal(case=index) + IMFs = itd.fit_transform(signal) + dim = len(IMFs.shape) + self.assertEqual( + first=dim, + second=2, + msg="The output shape of the decomposed signal is wrong", + ) + _, length = IMFs.shape + self.assertEqual( + first=len(signal), + second=length, + msg="Wrong length of decomposed signal", + ) + + def test_default_call(self) -> None: + """Verify the __call__ interface works.""" + time, signal = test_emd() + itd = ITD(N_max=5) + IMFs = itd(signal) + + dim = len(IMFs.shape) + self.assertEqual( + first=dim, + second=2, + msg="The output shape of the decomposed signal is wrong", + ) + _, length = IMFs.shape + self.assertEqual( + first=len(signal), second=length, msg="Wrong length of decomposed signal" + ) + + def test_reconstruction(self) -> None: + """Verify that IMFs sum back to the original signal.""" + time, signal = test_univariate_signal(case=1) + itd = ITD(N_max=5) + IMFs = itd.fit_transform(signal) + + reconstructed = np.sum(IMFs, axis=0) + self.assertTrue( + np.allclose(signal, reconstructed, atol=1e-10), + "IMFs should reconstruct the original signal", + ) + + def test_near_monotonic_signal(self) -> None: + """Verify ITD handles near-monotonic signals without crashing (issue #29).""" + np.random.seed(42) + N = 200 + t = np.linspace(0, 1, N) + signal = -1100 * t + 0.01 * np.sin(50 * t) + 0.005 * np.random.randn(N) + + itd = ITD(N_max=3) + IMFs = itd.fit_transform(signal) + + self.assertTrue(IMFs.shape[0] >= 1, "Should produce at least one component") + self.assertEqual(IMFs.shape[1], N, "Output length should match input") + reconstructed = np.sum(IMFs, axis=0) + self.assertTrue( + np.allclose(signal, reconstructed, atol=1e-10), + "IMFs should reconstruct the original signal", + ) + + def test_pure_trend(self) -> None: + """Verify ITD handles a pure trend (no oscillation).""" + signal = np.linspace(0, 10, 100) + itd = ITD(N_max=3) + IMFs = itd.fit_transform(signal) + + self.assertTrue(IMFs.shape[0] >= 1, "Should produce at least one component") + reconstructed = np.sum(IMFs, axis=0) + self.assertTrue( + np.allclose(signal, reconstructed, atol=1e-10), + "IMFs should reconstruct the original signal", + ) + + def test_asymmetric_extrema(self) -> None: + """Verify ITD works when maxima and minima counts differ.""" + t = np.linspace(0, 4 * np.pi, 500) + signal = np.sin(t) + 0.5 * np.sin(3 * t) + 0.3 * np.cos(0.5 * t) + + itd = ITD(N_max=5) + IMFs = itd.fit_transform(signal) + + self.assertTrue(IMFs.shape[0] >= 2, "Should produce multiple components") + reconstructed = np.sum(IMFs, axis=0) + self.assertTrue( + np.allclose(signal, reconstructed, atol=1e-10), + "IMFs should reconstruct the original signal", + ) + + def test_str(self) -> None: + """Verify string representation.""" + itd = ITD() + self.assertIn("ITD", str(itd)) + + +if __name__ == "__main__": + unittest.main()