1 /*2 * Copyright (c) 2008, Willow Garage, Inc.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of the Willow Garage, Inc. nor the names of its
14 * contributors may be used to endorse or promote products derived from
15 * this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28*/
29
32#ifndef TF_MESSAGE_FILTER_H33 #define TF_MESSAGE_FILTER_H
34
35 #include
36 #include
37 #include
38
39 #include
40 #include
41 #include
42 #include
43 #include
44 #include
45 #include
46 #include
47 #include
48
49 #include
50
51 #include
52 #include
53
54 #define TF_MESSAGEFILTER_DEBUG(fmt, ...) \
55 ROS_DEBUG_NAMED("message_filter", "MessageFilter [target=%s]:"fmt, getTargetFramesString().c_str(), __VA_ARGS__)56
57 #define TF_MESSAGEFILTER_WARN(fmt, ...) \
58 ROS_WARN_NAMED("message_filter", "MessageFilter [target=%s]:"fmt, getTargetFramesString().c_str(), __VA_ARGS__)59
60 namespacetf61{62
63 namespacefilter_failure_reasons64{65 enumFilterFailureReason66{68Unknown,70OutTheBack,72EmptyFrameID,73};74}75typedef filter_failure_reasons::FilterFailureReason FilterFailureReason;76
77 classMessageFilterBase78{79 public:80 virtual ~MessageFilterBase(){}81 virtual void clear() = 0;82 virtual void setTargetFrame(const std::string& target_frame) = 0;83 virtual void setTargetFrames(const std::vector<:string>& target_frames) = 0;84 virtual void setTolerance(const ros::Duration& tolerance) = 0;85 virtual void setQueueSize( uint32_t new_queue_size ) = 0;86 virtual uint32_t getQueueSize() = 0;87};88
105 template
106 class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter
107{108 public:109 typedef boost::shared_ptrMConstPtr;110 typedef ros::MessageEventMEvent;111 typedef boost::functionFailureCallback;112 typedef boost::signals2::signalFailureSignal;113
123 MessageFilter(Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))124: tf_(tf)125, nh_(nh)126, max_rate_(max_rate)127, queue_size_(queue_size)128{129init();130
131setTargetFrame(target_frame);132}133
144 template
145 MessageFilter(F& f, Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01))146: tf_(tf)147, nh_(nh)148, max_rate_(max_rate)149, queue_size_(queue_size)150{151init();152
153setTargetFrame(target_frame);154
155connectInput(f);156}157
161 template
162 void connectInput(F&f)163{164message_connection_.disconnect();165 message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this);166}167
171 ~MessageFilter()172{173 //Explicitly stop callbacks; they could execute after we're destroyed
178clear();179
180 TF_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Failed Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",181 (long long unsigned int)successful_transform_count_, (long long unsigned int)failed_transform_count_,182 (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_,183 (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);184
185}186
190 void setTargetFrame(const std::string&target_frame)191{192 std::vector<:string>frames;193frames.push_back(target_frame);194setTargetFrames(frames);195}196
200 void setTargetFrames(const std::vector<:string>&target_frames)201{202boost::mutex::scoped_lock list_lock(messages_mutex_);203boost::mutex::scoped_lock string_lock(target_frames_string_mutex_);204
205 target_frames_ =target_frames;206
207std::stringstream ss;208 for (std::vector<:string>::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)209{210 ss << *it << " ";211}212 target_frames_string_ =ss.str();213}217 std::stringgetTargetFramesString()218{219 boost::mutex::scoped_lock lock(target_frames_string_mutex_);220 returntarget_frames_string_;221};222
226 void setTolerance(const ros::Duration&tolerance)227{228 time_tolerance_ =tolerance;229}230
234 voidclear()235{236 boost::mutex::scoped_lock lock(messages_mutex_);237
238 TF_MESSAGEFILTER_DEBUG("%s", "Cleared");239
240messages_.clear();241 message_count_ = 0;242
243 warned_about_unresolved_name_ = false;244 warned_about_empty_frame_id_ = false;245}246
247 void add(const MEvent&evt)248{249 boost::mutex::scoped_lock lock(messages_mutex_);250
251testMessages();252
253 if (!testMessage(evt))254{255 //If this message is about to push us past our queue size, erase the oldest message
256 if (queue_size_ != 0 && message_count_ + 1 >queue_size_)257{258 ++dropped_message_count_;259 const MEvent& front =messages_.front();260TF_MESSAGEFILTER_DEBUG(261 "Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)",262message_count_,263 ros::message_traits::FrameId::value(*front.getMessage()).c_str(),264 ros::message_traits::TimeStamp::value(*front.getMessage()).toSec());265signalFailure(messages_.front(), filter_failure_reasons::Unknown);266
267messages_.pop_front();268 --message_count_;269}270
271 //Add the message to our list
272messages_.push_back(evt);273 ++message_count_;274}275
276TF_MESSAGEFILTER_DEBUG(277 "Added message in frame %s at time %.3f, count now %d",278 ros::message_traits::FrameId::value(*evt.getMessage()).c_str(),279 ros::message_traits::TimeStamp::value(*evt.getMessage()).toSec(),280message_count_);281
282 ++incoming_message_count_;283}284
290 void add(const MConstPtr&message)291{292
293 boost::shared_ptr<:map std::string> > header(new std::map<:string std::string>);294 (*header)["callerid"] = "unknown";295add(MEvent(message, header, ros::Time::now()));296}297
302 message_filters::Connection registerFailureCallback(const FailureCallback&callback)303{304 boost::mutex::scoped_lock lock(failure_signal_mutex_);305 return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback));306}307
308 virtual voidsetQueueSize( uint32_t new_queue_size )309{310 queue_size_ =new_queue_size;311}312
313 virtualuint32_t getQueueSize()314{315 returnqueue_size_;316}317
318 private:319
320 voidinit()321{322 message_count_ = 0;323 new_transforms_ = false;324 successful_transform_count_ = 0;325 failed_transform_count_ = 0;326 failed_out_the_back_count_ = 0;327 transform_message_count_ = 0;328 incoming_message_count_ = 0;329 dropped_message_count_ = 0;330 time_tolerance_ = ros::Duration(0.0);331 warned_about_unresolved_name_ = false;332 warned_about_empty_frame_id_ = false;333
334 tf_connection_ = tf_.addTransformsChangedListener(boost::bind(&MessageFilter::transformsChanged, this));335
336 max_rate_timer_ = nh_.createTimer(max_rate_, &MessageFilter::maxRateTimerCallback, this);337}338
339 typedef std::listL_Event;340
341 bool testMessage(const MEvent&evt)342{343 const MConstPtr& message =evt.getMessage();344 std::string callerid =evt.getPublisherName();345 std::string frame_id = ros::message_traits::FrameId::value(*message);346 ros::Time stamp = ros::message_traits::TimeStamp::value(*message);347
348 //Throw out messages which have an empty frame_id
349 if(frame_id.empty())350{351 if (!warned_about_empty_frame_id_)352{353 warned_about_empty_frame_id_ = true;354 TF_MESSAGEFILTER_WARN("Discarding message from [%s] due to empty frame_id. This message will only print once.", callerid.c_str());355}356signalFailure(evt, filter_failure_reasons::EmptyFrameID);357 return true;358}359
360
361 //Throw out messages which are too old
363 for (std::vector<:string>::iterator target_it = target_frames_.begin(); target_it != target_frames_.end(); ++target_it)364{365 const std::string& target_frame = *target_it;366
367 if (target_frame != frame_id && stamp != ros::Time(0))368{369ros::Time latest_transform_time ;370
371 tf_.getLatestCommonTime(frame_id, target_frame, latest_transform_time, 0) ;372
373 if (stamp + tf_.getCacheLength()
379 "(stamp: %.3f + cache_length: %.3f < latest_transform_time %.3f."
380 "Message Count now: %d",381 ros::message_traits::FrameId::value(*message).c_str(),382 ros::message_traits::TimeStamp::value(*message).toSec(),383tf_.getCacheLength().toSec(), latest_transform_time.toSec(), message_count_);384
385 last_out_the_back_stamp_ =stamp;386 last_out_the_back_frame_ =frame_id;387
388signalFailure(evt, filter_failure_reasons::OutTheBack);389 return true;390}391}392
393}394
395 bool ready = !target_frames_.empty();396 for (std::vector<:string>::iterator target_it = target_frames_.begin(); ready && target_it != target_frames_.end(); ++target_it)397{398 std::string& target_frame = *target_it;399 if (time_tolerance_ != ros::Duration(0.0))400{401 ready = ready && (tf_.canTransform(target_frame, frame_id, stamp) &&
402 tf_.canTransform(target_frame, frame_id, stamp +time_tolerance_) );403}404 else
405{406 ready = ready &&tf_.canTransform(target_frame, frame_id, stamp);407}408}409
410 if(ready)411{412 TF_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_);413
414 ++successful_transform_count_;415
416 this->signalMessage(evt);417}418 else
419{420 ++failed_transform_count_;421}422
423 returnready;424}425
426 voidtestMessages()427{428 if (!messages_.empty() && getTargetFramesString() == " ")429{430 ROS_WARN_NAMED("message_notifier", "MessageFilter [target=%s]: empty target frame", getTargetFramesString().c_str());431}432
433 int i = 0;434
435 typename L_Event::iterator it =messages_.begin();436 for (; it != messages_.end(); ++i)437{438 MEvent& evt = *it;439
440 if(testMessage(evt))441{442 --message_count_;443 it =messages_.erase(it);444}445 else
446{447 ++it;448}449}450}451
452 void maxRateTimerCallback(const ros::TimerEvent&)453{454boost::mutex::scoped_lock list_lock(messages_mutex_);455 if(new_transforms_)456{457testMessages();458 new_transforms_ = false;459}460
461checkFailures();462}463
467 void incomingMessage(const ros::MessageEvent&evt)468{469add(evt);470}471
472 voidtransformsChanged()473{474 new_transforms_ = true;475
476 ++transform_message_count_;477}478
479 voidcheckFailures()480{481 if(next_failure_warning_.isZero())482{483 next_failure_warning_ = ros::Time::now() + ros::Duration(15);484}485
486 if (ros::Time::now() >=next_failure_warning_)487{488 if (incoming_message_count_ - message_count_ == 0)489{490 return;491}492
493 double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ -message_count_);494 if (dropped_pct > 0.95)495{496 TF_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);497 next_failure_warning_ = ros::Time::now() + ros::Duration(60);498
499 if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)500{501 TF_MESSAGEFILTER_WARN("The majority of dropped messages were due to messages growing older than the TF cache time. The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());502}503}504}505}506
507 void disconnectFailure(const message_filters::Connection&c)508{509 boost::mutex::scoped_lock lock(failure_signal_mutex_);510c.getBoostConnection().disconnect();511}512
513 void signalFailure(const MEvent&evt, FilterFailureReason reason)514{515 boost::mutex::scoped_lock lock(failure_signal_mutex_);516failure_signal_(evt.getMessage(), reason);517}518
519 Transformer&tf_;520ros::NodeHandle nh_;521ros::Duration max_rate_;522ros::Timer max_rate_timer_;523 std::vector<:string>target_frames_;524 std::stringtarget_frames_string_;525boost::mutex target_frames_string_mutex_;526uint32_t queue_size_;527
528L_Event messages_;529uint32_t message_count_;530boost::mutex messages_mutex_;531
532 boolnew_messages_;533 volatile boolnew_transforms_;534
535 boolwarned_about_unresolved_name_;536 boolwarned_about_empty_frame_id_;537
538uint64_t successful_transform_count_;539uint64_t failed_transform_count_;540uint64_t failed_out_the_back_count_;541uint64_t transform_message_count_;542uint64_t incoming_message_count_;543uint64_t dropped_message_count_;544
545ros::Time last_out_the_back_stamp_;546 std::stringlast_out_the_back_frame_;547
548ros::Time next_failure_warning_;549
550ros::Duration time_tolerance_;551
552boost::signals2::connection tf_connection_;553message_filters::Connection message_connection_;554
555FailureSignal failure_signal_;556boost::mutex failure_signal_mutex_;557};558
559
560 } //namespace tf
561
562 #endif
563