3 from __future__
import absolute_import
4 from __future__
import division
5 from __future__
import print_function
17 import dynamic_reconfigure.server
20 import message_filters
23 from std_srvs.srv
import Trigger
24 from std_srvs.srv
import TriggerResponse
26 from jsk_data.cfg
import DataCollectionServerConfig
30 ext = osp.splitext(filename)[1]
32 pkl.dump(arr, open(filename,
'wb'))
34 np.savez_compressed(filename, arr)
35 elif ext
in [
'.png',
'.jpg']:
36 PIL.Image.fromarray(arr).save(filename)
43 """Server to collect data. 48 - name: /camera/rgb/image_raw 49 msg_class: sensor_msgs/Image 52 - name: /camera/depth/image_raw 53 msg_class: sensor_msgs/Image 57 - key: /in_hand_data_collection_main/object 64 dynamic_reconfigure.server.Server(
67 self.
topics = rospy.get_param(
'~topics', [])
70 required_fields = [
'name',
'msg_class',
'fname',
'savetype']
71 for field
in required_fields:
72 if field
not in topic:
73 jsk_logfatal(
"Required field '{}' for topic is missing" 76 self.
params = rospy.get_param(
'~params', [])
77 self.
slop = rospy.get_param(
'~slop', 0.1)
80 required_fields = [
'key',
'fname',
'savetype']
81 for field
in required_fields:
82 if field
not in param:
83 jsk_logfatal(
"Required field '{}' for param is missing" 87 method = rospy.get_param(
'~method',
'request')
88 use_message_filters = rospy.get_param(
'~message_filters',
False)
91 if rospy.has_param(
'~with_request'):
92 rospy.logwarn(
'Deprecated param: ~with_request, Use ~method')
93 if not rospy.get_param(
'~with_request'):
94 use_message_filters =
True 96 if method ==
'message_filters':
98 'Deprecated param: ~method: message_filters,' 99 'Use ~message_filters: true')
100 use_message_filters =
True 106 msg_class = roslib.message.get_message_class(
108 if use_message_filters:
110 topic[
'name'], msg_class)
112 sub = rospy.Subscriber(
113 topic[
'name'], msg_class, self.
sub_cb,
114 callback_args=topic[
'name'])
115 self.subs.append(sub)
118 if use_message_filters:
119 queue_size = rospy.get_param(
'~queue_size', 10)
120 approximate_sync = rospy.get_param(
'~approximate_sync',
False)
122 slop = rospy.get_param(
'~slop', 0.1)
123 self.
sync = message_filters.ApproximateTimeSynchronizer(
124 self.
subs, queue_size=queue_size, slop=slop)
127 self.
subs, queue_size=queue_size)
130 if method ==
'request':
131 if use_message_filters:
136 self.
server = rospy.Service(
138 elif method ==
'timer':
139 duration = rospy.Duration(1.0 / rospy.get_param(
'~hz', 1.0))
145 if use_message_filters:
151 if use_message_filters:
155 '~use_filters: False, ~method: None is not supported')
165 for sub
in self.
subs:
169 for topic, msg
in zip(self.
topics, msgs):
170 self.
msg[topic[
'name']] = {
171 'stamp': msg.header.stamp,
180 self.
msg[topic_name] = {
181 'stamp': msg.header.stamp
if msg._has_header
else rospy.Time.now(),
186 if savetype ==
'ColorImage':
187 bridge = cv_bridge.CvBridge()
188 img = bridge.imgmsg_to_cv2(msg,
'rgb8')
190 elif savetype ==
'DepthImage':
191 bridge = cv_bridge.CvBridge()
192 depth = bridge.imgmsg_to_cv2(msg)
194 elif savetype ==
'LabelImage':
195 bridge = cv_bridge.CvBridge()
196 label = bridge.imgmsg_to_cv2(msg)
198 elif savetype ==
'YAML':
199 msg_yaml = genpy.message.strify_message(msg)
200 with open(filename,
'w')
as f:
203 rospy.logerr(
'Unexpected savetype for topic: {}'.format(savetype))
207 value = rospy.get_param(param)
208 if savetype ==
'Text':
209 with open(filename,
'w')
as f:
211 elif savetype ==
'YAML':
212 content = yaml.safe_dump(value, allow_unicode=
True,
213 default_flow_style=
False)
214 with open(filename,
'w')
as f:
217 rospy.logerr(
'Unexpected savetype for param: {}'.format(savetype))
221 stamp = self.
msg[self.
topics[0][
'name']][
'stamp']
222 save_dir = osp.join(self.
save_dir, str(stamp.to_nsec()))
223 if not osp.exists(save_dir):
224 os.makedirs(save_dir)
226 msg = self.
msg[topic[
'name']][
'msg']
227 filename = osp.join(save_dir, topic[
'fname'])
228 self.
save_topic(topic[
'name'], msg, topic[
'savetype'], filename)
230 filename = osp.join(save_dir, param[
'fname'])
231 self.
save_param(param[
'key'], param[
'savetype'], filename)
232 msg =
'Saved data to {}'.format(save_dir)
237 now = rospy.Time.now()
239 while len(saving_msgs) < len(self.
topics):
241 if topic[
'name']
in saving_msgs:
243 if topic[
'name']
not in self.
msg:
245 stamp = self.
msg[topic[
'name']][
'stamp']
246 if abs(now - stamp) < rospy.Duration(self.
slop):
247 saving_msgs[topic[
'name']] = self.
msg[topic[
'name']][
'msg']
249 msg =
'timeout for topic [{}]. try bigger slop'.format(
256 save_dir = osp.join(self.
save_dir, str(now.to_nsec()))
260 if not osp.exists(save_dir):
261 os.makedirs(save_dir)
263 msg = saving_msgs[topic[
'name']]
264 filename = osp.join(save_dir, topic[
'fname'])
265 self.
save_topic(topic[
'name'], msg, topic[
'savetype'], filename)
267 filename = osp.join(save_dir, param[
'fname'])
268 self.
save_param(param[
'key'], param[
'savetype'], filename)
269 msg =
'Saved data to {}'.format(save_dir)
275 return TriggerResponse(success=
True)
279 return TriggerResponse(success=
True)
282 result, msg = self.
_save()
284 return TriggerResponse(success=
True, message=msg)
286 return TriggerResponse(success=
False, message=msg)
291 return TriggerResponse(success=
True, message=msg)
293 return TriggerResponse(success=
False, message=msg)
297 result, msg = self.
_save()
304 if __name__ ==
'__main__':
305 rospy.init_node(
'data_collection_server')
def save_topic(self, topic, msg, savetype, filename)
def sync_sub_cb(self, msgs)
def end_service_cb(self, req)
def service_cb(self, req)
def timer_cb(self, event)
def save_param(self, param, savetype, filename)
def reconfig_cb(self, config, level)
def sync_service_cb(self, req)
def sync_timer_cb(self, event)
def start_service_cb(self, req)
def sub_cb(self, msg, topic_name)
def dump_ndarray(filename, arr)
def sync_sub_and_save_cb(self, msgs)