Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 69 additions & 48 deletions pysdkit/_itd/itd.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
Created on 2025/01/12 12:24:54
@author: Whenxuan Wang
@email: wwhenxuan@gmail.com
not done!!!
"""
import numpy as np

Expand Down Expand Up @@ -35,30 +34,45 @@ def __str__(self) -> str:
return "Intrinsic Time-Scale Decomposition (ITD)"

def stop_iter(self, x, counter, E_x):
"""Stop the main iteration loop"""
"""Determine whether the ITD iteration should stop.

:param x: current residual signal
:param counter: number of iterations completed
:param E_x: energy of the original input signal
:return: True if decomposition should stop
"""
if counter > self.N_max:
return True

Exx = np.sum(x**2)
if Exx <= 0.01 * E_x:
return True

# 获取输入信号的峰值
pks1, _ = find_peaks(x) # 极大值位置
pks2, _ = find_peaks(-x) # 极小值位置
pks = np.union1d(pks1, pks2) # 所有的极值点
pks1, _ = find_peaks(x)
pks2, _ = find_peaks(-x)
pks = np.union1d(pks1, pks2)

if len(pks) <= 7:
return True

# EOF
return False

@staticmethod
def itd_baseline_extract(x):
"""This function is to calculate the baseline L of the signal x."""
"""
Calculate the baseline L of the signal x using the ITD algorithm.

The baseline is constructed by computing control points (LK) at each
extremum as a weighted average (alpha=0.5) of the signal value and the
interpolated opposite envelope, then piecewise-linearly interpolating
between consecutive extrema using signal-value-based slopes.

:param x: input 1D signal array
:return: tuple (L, H) where L is the baseline and H = x - L is the
proper rotation component
"""
length = len(x)
t = np.arange(0, length)
t = np.arange(length)
alpha = 0.5

idx_max, _ = find_peaks(x)
Expand All @@ -67,68 +81,75 @@ def itd_baseline_extract(x):
idx_min, _ = find_peaks(-x)
val_min = x[idx_min]

if len(idx_max) == 0 or len(idx_min) == 0:
return x.copy(), np.zeros_like(x)

idx_cb = np.union1d(idx_max, idx_min)

# Check the boundary conditions

# Left side
if np.min(idx_max) < np.min(idx_min):
idx_min = np.append(idx_max[0], idx_min)
val_min = np.append(val_min[0], val_min)
elif np.min(idx_max) > np.min(idx_min):
idx_max = np.append(idx_min[0], idx_max)
val_max = np.append(val_max[0], val_max)

# Right side
if np.max(idx_max) > np.max(idx_min):
idx_min = np.append(idx_min, idx_max[-1])
val_min = np.append(val_min, val_min[-1])
elif np.max(idx_max) < np.max(idx_min):
idx_max = np.append(idx_max, idx_min[-1])
val_max = np.append(val_max, val_max[-1])

# Compute the LK points
# 创建插值函数对象,使用线性插值
# Pad boundary so that idx_max and idx_min span the same index range.
# When the first (or last) extremum on one side has no counterpart,
# mirror the opposite extremum's position with the signal value there.
if idx_max[0] < idx_min[0]:
idx_min = np.concatenate(([idx_max[0]], idx_min))
val_min = np.concatenate(([x[idx_max[0]]], val_min))
elif idx_min[0] < idx_max[0]:
idx_max = np.concatenate(([idx_min[0]], idx_max))
val_max = np.concatenate(([x[idx_min[0]]], val_max))

if idx_max[-1] > idx_min[-1]:
idx_min = np.concatenate((idx_min, [idx_max[-1]]))
val_min = np.concatenate((val_min, [x[idx_max[-1]]]))
elif idx_min[-1] > idx_max[-1]:
idx_max = np.concatenate((idx_max, [idx_min[-1]]))
val_max = np.concatenate((val_max, [x[idx_min[-1]]]))

Max_line_interp = interp1d(
idx_max, val_max, kind="linear", fill_value="extrapolate"
)
Min_line_interp = interp1d(
idx_min, val_min, kind="linear", fill_value="extrapolate"
)

# 计算插值点的值
Max_line = Max_line_interp(t)
Min_line = Min_line_interp(t)

LK1 = alpha * Max_line[idx_min] + val_min * (1 - alpha)
LK2 = alpha * Min_line[idx_max] + val_max * (1 - alpha)
# LK control points: weighted average at each extremum
LK1_vals = alpha * Max_line[idx_min] + (1 - alpha) * val_min
LK2_vals = alpha * Min_line[idx_max] + (1 - alpha) * val_max

LK1 = np.append(idx_min.reshape(-1, 1), LK1.reshape(-1, 1))
LK2 = np.append(idx_max.reshape(-1, 1), LK2.reshape(-1, 1))
# Build (position, value) pairs and merge
LK1_pts = np.column_stack((idx_min, LK1_vals))
LK2_pts = np.column_stack((idx_max, LK2_vals))
LK = np.vstack((LK1_pts, LK2_pts))

print(LK1.shape, LK2.shape)
# Sort by position and remove duplicates at the boundaries
order = np.argsort(LK[:, 0])
LK = LK[order]

LK = np.vstack((LK1, LK2)).T
# Remove the padded boundary duplicates (first and last are mirrors)
if len(LK) > 2:
LK = LK[1:-1]

LK_col_2 = np.argsort(LK[:, 0])
LK_sorted = LK[LK_col_2, :]
LK = LK_sorted[1:-1, :]
# Clamp to signal boundaries
LK = np.vstack((
[0, LK[0, 1]],
LK,
[length - 1, LK[-1, 1]],
))

LK = np.vstack((np.array([0, LK[0, 1]]), LK, np.array([length - 1, LK[-1, 1]])))

# Compute the Lt points
idx_Xk = np.hstack((0, idx_cb, length - 1)).reshape([len(idx_cb) + 2])
# Compute baseline by piecewise-linear interpolation between extrema
idx_Xk = np.concatenate(([0], idx_cb, [length - 1])).astype(int)

L = np.zeros(length)
for i in range(0, len(idx_Xk) - 1):
for i in range(len(idx_Xk) - 1):
denom = x[idx_Xk[i + 1]] - x[idx_Xk[i]]
kij = (LK[i + 1, 1] - LK[i, 1]) / denom if denom != 0 else 0.0
for j in range(idx_Xk[i], idx_Xk[i + 1]):
kij = (LK[i + 1, 1] - LK[i, 1]) / (
x[idx_Xk[i + 1]] - x[idx_Xk[i]]
) # compute the slope K
L[j] = LK[i, 1] + kij * (x[j] - x[idx_Xk[i]])

L[length - 1] = LK[-1, 1]

H = x - L
print(H)
return L, H

def fit_transform(self, signal: np.ndarray) -> np.ndarray:
Expand Down
116 changes: 116 additions & 0 deletions pysdkit/tests/test_itd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
"""
Tests for the Intrinsic Time-Scale Decomposition (ITD) algorithm.
"""
import unittest
import numpy as np

from pysdkit import ITD
from pysdkit.data import test_emd, test_univariate_signal


class ITDTest(unittest.TestCase):
"""Test that ITD runs correctly on various signal types."""

def test_fit_transform(self) -> None:
"""Verify signal decomposition on built-in test signals."""
itd = ITD(N_max=5)
for index in range(1, 4):
time, signal = test_univariate_signal(case=index)
IMFs = itd.fit_transform(signal)
dim = len(IMFs.shape)
self.assertEqual(
first=dim,
second=2,
msg="The output shape of the decomposed signal is wrong",
)
_, length = IMFs.shape
self.assertEqual(
first=len(signal),
second=length,
msg="Wrong length of decomposed signal",
)

def test_default_call(self) -> None:
"""Verify the __call__ interface works."""
time, signal = test_emd()
itd = ITD(N_max=5)
IMFs = itd(signal)

dim = len(IMFs.shape)
self.assertEqual(
first=dim,
second=2,
msg="The output shape of the decomposed signal is wrong",
)
_, length = IMFs.shape
self.assertEqual(
first=len(signal), second=length, msg="Wrong length of decomposed signal"
)

def test_reconstruction(self) -> None:
"""Verify that IMFs sum back to the original signal."""
time, signal = test_univariate_signal(case=1)
itd = ITD(N_max=5)
IMFs = itd.fit_transform(signal)

reconstructed = np.sum(IMFs, axis=0)
self.assertTrue(
np.allclose(signal, reconstructed, atol=1e-10),
"IMFs should reconstruct the original signal",
)

def test_near_monotonic_signal(self) -> None:
"""Verify ITD handles near-monotonic signals without crashing (issue #29)."""
np.random.seed(42)
N = 200
t = np.linspace(0, 1, N)
signal = -1100 * t + 0.01 * np.sin(50 * t) + 0.005 * np.random.randn(N)

itd = ITD(N_max=3)
IMFs = itd.fit_transform(signal)

self.assertTrue(IMFs.shape[0] >= 1, "Should produce at least one component")
self.assertEqual(IMFs.shape[1], N, "Output length should match input")
reconstructed = np.sum(IMFs, axis=0)
self.assertTrue(
np.allclose(signal, reconstructed, atol=1e-10),
"IMFs should reconstruct the original signal",
)

def test_pure_trend(self) -> None:
"""Verify ITD handles a pure trend (no oscillation)."""
signal = np.linspace(0, 10, 100)
itd = ITD(N_max=3)
IMFs = itd.fit_transform(signal)

self.assertTrue(IMFs.shape[0] >= 1, "Should produce at least one component")
reconstructed = np.sum(IMFs, axis=0)
self.assertTrue(
np.allclose(signal, reconstructed, atol=1e-10),
"IMFs should reconstruct the original signal",
)

def test_asymmetric_extrema(self) -> None:
"""Verify ITD works when maxima and minima counts differ."""
t = np.linspace(0, 4 * np.pi, 500)
signal = np.sin(t) + 0.5 * np.sin(3 * t) + 0.3 * np.cos(0.5 * t)

itd = ITD(N_max=5)
IMFs = itd.fit_transform(signal)

self.assertTrue(IMFs.shape[0] >= 2, "Should produce multiple components")
reconstructed = np.sum(IMFs, axis=0)
self.assertTrue(
np.allclose(signal, reconstructed, atol=1e-10),
"IMFs should reconstruct the original signal",
)

def test_str(self) -> None:
"""Verify string representation."""
itd = ITD()
self.assertIn("ITD", str(itd))


if __name__ == "__main__":
unittest.main()