-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdetect_bar_contact.py
More file actions
executable file
·141 lines (115 loc) · 3.85 KB
/
detect_bar_contact.py
File metadata and controls
executable file
·141 lines (115 loc) · 3.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Detects contacts with bars in Gazebo
#!/usr/bin/env python
import datetime
import subprocess
from pathlib import Path
import rospy
import pretty_midi
from sensor_msgs.msg import JointState
from std_msgs.msg import String
# velocity threshold to count as a bar hit
VEL_THRESHOLD = 0.005
# time between contacts to register as a separate one
TIME_THRESHOLD = 0.0 # s
# duration of a note (assume fixed for now)
NOTE_DURATION = 0.5 # s
# where to write MIDI files
try:
MIDI_DIR = subprocess.check_output(["rospack", "find", "marimbabot_simulation"]).decode("utf-8").strip()
rospy.loginfo(f"MIDI dir set to {MIDI_DIR}")
except subprocess.CalledProcessError:
rospy.logerr("Could not find MIDI directory. Writing to current directory instead.")
MIDI_DIR = "./"
# names of bars we are interested in
# corresponding joints are bar_<note>/joint
# e.g. "bar_a4" <-> "bar_a4/joint"
BAR_JOINT_NAMES = [
"bar_a4/joint",
"bar_a5/joint",
"bar_a6/joint",
"bar_ais4/joint",
"bar_ais5/joint",
"bar_ais6/joint",
"bar_b4/joint",
"bar_b5/joint",
"bar_b6/joint",
"bar_c4/joint",
"bar_c5/joint",
"bar_c6/joint",
"bar_c7/joint",
"bar_cis4/joint",
"bar_cis5/joint",
"bar_cis6/joint",
"bar_d4/joint",
"bar_d5/joint",
"bar_d6/joint",
"bar_dis4/joint",
"bar_dis5/joint",
"bar_dis6/joint",
"bar_e4/joint",
"bar_e5/joint",
"bar_e6/joint",
"bar_f4/joint",
"bar_f5/joint",
"bar_f6/joint",
"bar_fis4/joint",
"bar_fis5/joint",
"bar_fis6/joint",
"bar_g4/joint",
"bar_g5/joint",
"bar_g6/joint",
"bar_gis4/joint",
"bar_gis5/joint",
"bar_gis6/joint",
]
joint2note = {joint: joint[4 : joint.index("/")] for joint in BAR_JOINT_NAMES}
pub = None
play_start_time = None
last_contact_timestamps = {joint: 0 for joint in BAR_JOINT_NAMES}
# holds tuples (note, start_time)
played_notes = []
def save_midi(notes):
"""Save notes to a MIDI file using pretty_midi."""
midi_dir = Path(MIDI_DIR) / "midi"
midi_dir.mkdir(exist_ok=True)
midi_path = midi_dir / f"{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.midi"
midi = pretty_midi.PrettyMIDI()
instrument_program = pretty_midi.instrument_name_to_program("Marimba")
instrument = pretty_midi.Instrument(program=instrument_program)
for note, start in notes:
note_number = pretty_midi.note_name_to_number(note.replace("is", "#").upper())
note = pretty_midi.Note(
velocity=100,
pitch=note_number,
start=start,
end=start + NOTE_DURATION,
)
instrument.notes.append(note)
midi.instruments.append(instrument)
midi.write(str(midi_path.resolve()))
rospy.loginfo(f"Saved MIDI file to {midi_path}")
def joint_states_callback(message):
for i, name in enumerate(message.name):
if name not in joint2note:
continue
# ignore joint if it has an ongoing contact
if message.header.stamp.secs - last_contact_timestamps[name] < TIME_THRESHOLD:
continue
vel = message.velocity[i]
if vel > VEL_THRESHOLD:
global play_start_time
if play_start_time is None:
play_start_time = message.header.stamp.secs
rospy.loginfo(f"Detected contact with {joint2note[name]} vel={vel:.5f}")
pub.publish(joint2note[name])
last_contact_timestamps[name] = message.header.stamp.secs
played_notes.append((joint2note[name], message.header.stamp.secs - play_start_time))
def listener():
rospy.init_node("detect_bar_contact")
global pub
pub = rospy.Publisher("sim_notes", String, queue_size=1)
rospy.Subscriber("joint_states", JointState, joint_states_callback, queue_size=1)
rospy.spin()
if __name__ == "__main__":
rospy.on_shutdown(lambda: save_midi(played_notes))
listener()