00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/init.h"
00036 #include "ros/names.h"
00037 #include "ros/xmlrpc_manager.h"
00038 #include "ros/poll_manager.h"
00039 #include "ros/connection_manager.h"
00040 #include "ros/topic_manager.h"
00041 #include "ros/service_manager.h"
00042 #include "ros/this_node.h"
00043 #include "ros/network.h"
00044 #include "ros/file_log.h"
00045 #include "ros/callback_queue.h"
00046 #include "ros/param.h"
00047 #include "ros/rosout_appender.h"
00048 #include "ros/subscribe_options.h"
00049 #include "ros/transport/transport_tcp.h"
00050 #include "ros/internal_timer_manager.h"
00051 #include "XmlRpcSocket.h"
00052
00053 #include "roscpp/GetLoggers.h"
00054 #include "roscpp/SetLoggerLevel.h"
00055 #include "roscpp/Empty.h"
00056
00057 #include <ros/console.h>
00058 #include <ros/time.h>
00059 #include <rosgraph_msgs/Clock.h>
00060
00061 #include <algorithm>
00062
00063 #include <signal.h>
00064
00065 #include <cstdlib>
00066
00067 namespace ros
00068 {
00069
00070 namespace master
00071 {
00072 void init(const M_string& remappings);
00073 }
00074
00075 namespace this_node
00076 {
00077 void init(const std::string& names, const M_string& remappings, uint32_t options);
00078 }
00079
00080 namespace network
00081 {
00082 void init(const M_string& remappings);
00083 }
00084
00085 namespace param
00086 {
00087 void init(const M_string& remappings);
00088 }
00089
00090 namespace file_log
00091 {
00092 void init(const M_string& remappings);
00093 }
00094
00095 CallbackQueuePtr g_global_queue;
00096 ROSOutAppender* g_rosout_appender;
00097 static CallbackQueuePtr g_internal_callback_queue;
00098
00099 static bool g_initialized = false;
00100 static bool g_started = false;
00101 static bool g_atexit_registered = false;
00102 static boost::mutex g_start_mutex;
00103 static bool g_ok = false;
00104 static uint32_t g_init_options = 0;
00105 static bool g_shutdown_requested = false;
00106 static volatile bool g_shutting_down = false;
00107 static boost::recursive_mutex g_shutting_down_mutex;
00108 static boost::thread g_internal_queue_thread;
00109
00110 bool isInitialized()
00111 {
00112 return g_initialized;
00113 }
00114
00115 bool isShuttingDown()
00116 {
00117 return g_shutting_down;
00118 }
00119
00120 void checkForShutdown()
00121 {
00122 if (g_shutdown_requested)
00123 {
00124
00125
00126 boost::recursive_mutex::scoped_try_lock lock(g_shutting_down_mutex, boost::defer_lock);
00127 while (!lock.try_lock() && !g_shutting_down)
00128 {
00129 ros::WallDuration(0.001).sleep();
00130 }
00131
00132 if (!g_shutting_down)
00133 {
00134 shutdown();
00135 }
00136
00137 g_shutdown_requested = false;
00138 }
00139 }
00140
00141 void requestShutdown()
00142 {
00143 g_shutdown_requested = true;
00144 }
00145
00146 void atexitCallback()
00147 {
00148 if (ok() && !isShuttingDown())
00149 {
00150 ROSCPP_LOG_DEBUG("shutting down due to exit() or end of main() without cleanup of all NodeHandles");
00151 shutdown();
00152 }
00153 }
00154
00155 void shutdownCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
00156 {
00157 int num_params = 0;
00158 if (params.getType() == XmlRpc::XmlRpcValue::TypeArray)
00159 num_params = params.size();
00160 if (num_params > 1)
00161 {
00162 std::string reason = params[1];
00163 ROS_WARN("Shutdown request received.");
00164 ROS_WARN("Reason given for shutdown: [%s]", reason.c_str());
00165 requestShutdown();
00166 }
00167
00168 result = xmlrpc::responseInt(1, "", 0);
00169 }
00170
00171 bool getLoggers(roscpp::GetLoggers::Request&, roscpp::GetLoggers::Response& resp)
00172 {
00173 std::map<std::string, ros::console::levels::Level> loggers;
00174 bool success = ::ros::console::get_loggers(loggers);
00175 if (success)
00176 {
00177 for (std::map<std::string, ros::console::levels::Level>::const_iterator it = loggers.begin(); it != loggers.end(); it++)
00178 {
00179 roscpp::Logger logger;
00180 logger.name = it->first;
00181 ros::console::levels::Level level = it->second;
00182 if (level == ros::console::levels::Debug)
00183 {
00184 logger.level = "debug";
00185 }
00186 else if (level == ros::console::levels::Info)
00187 {
00188 logger.level = "info";
00189 }
00190 else if (level == ros::console::levels::Warn)
00191 {
00192 logger.level = "warn";
00193 }
00194 else if (level == ros::console::levels::Error)
00195 {
00196 logger.level = "error";
00197 }
00198 else if (level == ros::console::levels::Fatal)
00199 {
00200 logger.level = "fatal";
00201 }
00202 resp.loggers.push_back(logger);
00203 }
00204 }
00205 return success;
00206 }
00207
00208 bool setLoggerLevel(roscpp::SetLoggerLevel::Request& req, roscpp::SetLoggerLevel::Response&)
00209 {
00210 std::transform(req.level.begin(), req.level.end(), req.level.begin(), (int(*)(int))std::toupper);
00211
00212 ros::console::levels::Level level;
00213 if (req.level == "DEBUG")
00214 {
00215 level = ros::console::levels::Debug;
00216 }
00217 else if (req.level == "INFO")
00218 {
00219 level = ros::console::levels::Info;
00220 }
00221 else if (req.level == "WARN")
00222 {
00223 level = ros::console::levels::Warn;
00224 }
00225 else if (req.level == "ERROR")
00226 {
00227 level = ros::console::levels::Error;
00228 }
00229 else if (req.level == "FATAL")
00230 {
00231 level = ros::console::levels::Fatal;
00232 }
00233 else
00234 {
00235 return false;
00236 }
00237
00238 bool success = ::ros::console::set_logger_level(req.logger, level);
00239 if (success)
00240 {
00241 console::notifyLoggerLevelsChanged();
00242 }
00243
00244 return success;
00245 }
00246
00247 bool closeAllConnections(roscpp::Empty::Request&, roscpp::Empty::Response&)
00248 {
00249 ROSCPP_LOG_DEBUG("close_all_connections service called, closing connections");
00250 ConnectionManager::instance()->clear(Connection::TransportDisconnect);
00251 return true;
00252 }
00253
00254 void clockCallback(const rosgraph_msgs::Clock::ConstPtr& msg)
00255 {
00256 Time::setNow(msg->clock);
00257 }
00258
00259 CallbackQueuePtr getInternalCallbackQueue()
00260 {
00261 if (!g_internal_callback_queue)
00262 {
00263 g_internal_callback_queue.reset(new CallbackQueue);
00264 }
00265
00266 return g_internal_callback_queue;
00267 }
00268
00269 void basicSigintHandler(int sig)
00270 {
00271 (void)sig;
00272 ros::requestShutdown();
00273 }
00274
00275 void internalCallbackQueueThreadFunc()
00276 {
00277 disableAllSignalsInThisThread();
00278
00279 CallbackQueuePtr queue = getInternalCallbackQueue();
00280
00281 while (!g_shutting_down)
00282 {
00283 queue->callAvailable(WallDuration(0.1));
00284 }
00285 }
00286
00287 bool isStarted()
00288 {
00289 return g_started;
00290 }
00291
00292 void start()
00293 {
00294 boost::mutex::scoped_lock lock(g_start_mutex);
00295 if (g_started)
00296 {
00297 return;
00298 }
00299
00300 g_shutdown_requested = false;
00301 g_shutting_down = false;
00302 g_started = true;
00303 g_ok = true;
00304
00305 bool enable_debug = false;
00306 std::string enable_debug_env;
00307 if ( get_environment_variable(enable_debug_env,"ROSCPP_ENABLE_DEBUG") )
00308 {
00309 try
00310 {
00311 enable_debug = boost::lexical_cast<bool>(enable_debug_env.c_str());
00312 }
00313 catch (boost::bad_lexical_cast&)
00314 {
00315 }
00316 }
00317
00318 char* env_ipv6 = NULL;
00319 #ifdef _MSC_VER
00320 _dupenv_s(&env_ipv6, NULL, "ROS_IPV6");
00321 #else
00322 env_ipv6 = getenv("ROS_IPV6");
00323 #endif
00324
00325 bool use_ipv6 = (env_ipv6 && strcmp(env_ipv6,"on") == 0);
00326 TransportTCP::s_use_ipv6_ = use_ipv6;
00327 XmlRpc::XmlRpcSocket::s_use_ipv6_ = use_ipv6;
00328
00329 #ifdef _MSC_VER
00330 if (env_ipv6)
00331 {
00332 free(env_ipv6);
00333 }
00334 #endif
00335
00336 param::param("/tcp_keepalive", TransportTCP::s_use_keepalive_, TransportTCP::s_use_keepalive_);
00337
00338 PollManager::instance()->addPollThreadListener(checkForShutdown);
00339 XMLRPCManager::instance()->bind("shutdown", shutdownCallback);
00340
00341 initInternalTimerManager();
00342
00343 TopicManager::instance()->start();
00344 ServiceManager::instance()->start();
00345 ConnectionManager::instance()->start();
00346 PollManager::instance()->start();
00347 XMLRPCManager::instance()->start();
00348
00349 if (!(g_init_options & init_options::NoSigintHandler))
00350 {
00351 signal(SIGINT, basicSigintHandler);
00352 }
00353
00354 ros::Time::init();
00355
00356 if (!(g_init_options & init_options::NoRosout))
00357 {
00358 g_rosout_appender = new ROSOutAppender;
00359 ros::console::register_appender(g_rosout_appender);
00360 }
00361
00362 if (g_shutting_down) goto end;
00363
00364 {
00365 ros::AdvertiseServiceOptions ops;
00366 ops.init<roscpp::GetLoggers>(names::resolve("~get_loggers"), getLoggers);
00367 ops.callback_queue = getInternalCallbackQueue().get();
00368 ServiceManager::instance()->advertiseService(ops);
00369 }
00370
00371 if (g_shutting_down) goto end;
00372
00373 {
00374 ros::AdvertiseServiceOptions ops;
00375 ops.init<roscpp::SetLoggerLevel>(names::resolve("~set_logger_level"), setLoggerLevel);
00376 ops.callback_queue = getInternalCallbackQueue().get();
00377 ServiceManager::instance()->advertiseService(ops);
00378 }
00379
00380 if (g_shutting_down) goto end;
00381
00382 if (enable_debug)
00383 {
00384 ros::AdvertiseServiceOptions ops;
00385 ops.init<roscpp::Empty>(names::resolve("~debug/close_all_connections"), closeAllConnections);
00386 ops.callback_queue = getInternalCallbackQueue().get();
00387 ServiceManager::instance()->advertiseService(ops);
00388 }
00389
00390 if (g_shutting_down) goto end;
00391
00392 {
00393 bool use_sim_time = false;
00394 param::param("/use_sim_time", use_sim_time, use_sim_time);
00395
00396 if (use_sim_time)
00397 {
00398 Time::setNow(ros::Time());
00399 }
00400
00401 if (g_shutting_down) goto end;
00402
00403 if (use_sim_time)
00404 {
00405 ros::SubscribeOptions ops;
00406 ops.init<rosgraph_msgs::Clock>(names::resolve("/clock"), 1, clockCallback);
00407 ops.callback_queue = getInternalCallbackQueue().get();
00408 TopicManager::instance()->subscribe(ops);
00409 }
00410 }
00411
00412 if (g_shutting_down) goto end;
00413
00414 g_internal_queue_thread = boost::thread(internalCallbackQueueThreadFunc);
00415 getGlobalCallbackQueue()->enable();
00416
00417 ROSCPP_LOG_DEBUG("Started node [%s], pid [%d], bound on [%s], xmlrpc port [%d], tcpros port [%d], using [%s] time",
00418 this_node::getName().c_str(), getpid(), network::getHost().c_str(),
00419 XMLRPCManager::instance()->getServerPort(), ConnectionManager::instance()->getTCPPort(),
00420 Time::useSystemTime() ? "real" : "sim");
00421
00422
00423
00424 end:
00425
00426 if (g_shutting_down)
00427 {
00428 boost::recursive_mutex::scoped_lock lock(g_shutting_down_mutex);
00429 }
00430 }
00431
00432 void init(const M_string& remappings, const std::string& name, uint32_t options)
00433 {
00434 if (!g_atexit_registered)
00435 {
00436 g_atexit_registered = true;
00437 atexit(atexitCallback);
00438 }
00439
00440 if (!g_global_queue)
00441 {
00442 g_global_queue.reset(new CallbackQueue);
00443 }
00444
00445 if (!g_initialized)
00446 {
00447 g_init_options = options;
00448 g_ok = true;
00449
00450 ROSCONSOLE_AUTOINIT;
00451
00452 #ifndef WIN32
00453 signal(SIGPIPE, SIG_IGN);
00454 #endif
00455 network::init(remappings);
00456 master::init(remappings);
00457
00458 this_node::init(name, remappings, options);
00459 file_log::init(remappings);
00460 param::init(remappings);
00461
00462 g_initialized = true;
00463 }
00464 }
00465
00466 void init(int& argc, char** argv, const std::string& name, uint32_t options)
00467 {
00468 M_string remappings;
00469
00470 int full_argc = argc;
00471
00472 for (int i = 0; i < argc; )
00473 {
00474 std::string arg = argv[i];
00475 size_t pos = arg.find(":=");
00476 if (pos != std::string::npos)
00477 {
00478 std::string local_name = arg.substr(0, pos);
00479 std::string external_name = arg.substr(pos + 2);
00480
00481 ROSCPP_LOG_DEBUG("remap: %s => %s", local_name.c_str(), external_name.c_str());
00482 remappings[local_name] = external_name;
00483
00484
00485 char *tmp = argv[i];
00486 for (int j = i; j < full_argc - 1; j++)
00487 argv[j] = argv[j+1];
00488 argv[argc-1] = tmp;
00489 argc--;
00490 }
00491 else
00492 {
00493 i++;
00494 }
00495 }
00496
00497 init(remappings, name, options);
00498 }
00499
00500 void init(const VP_string& remappings, const std::string& name, uint32_t options)
00501 {
00502 M_string remappings_map;
00503 VP_string::const_iterator it = remappings.begin();
00504 VP_string::const_iterator end = remappings.end();
00505 for (; it != end; ++it)
00506 {
00507 remappings_map[it->first] = it->second;
00508 }
00509
00510 init(remappings_map, name, options);
00511 }
00512
00513 std::string getROSArg(int argc, const char* const* argv, const std::string& arg)
00514 {
00515 for (int i = 0; i < argc; ++i)
00516 {
00517 std::string str_arg = argv[i];
00518 size_t pos = str_arg.find(":=");
00519 if (str_arg.substr(0,pos) == arg)
00520 {
00521 return str_arg.substr(pos+2);
00522 }
00523 }
00524 return "";
00525 }
00526
00527 void removeROSArgs(int argc, const char* const* argv, V_string& args_out)
00528 {
00529 for (int i = 0; i < argc; ++i)
00530 {
00531 std::string arg = argv[i];
00532 size_t pos = arg.find(":=");
00533 if (pos == std::string::npos)
00534 {
00535 args_out.push_back(arg);
00536 }
00537 }
00538 }
00539
00540 void spin()
00541 {
00542 SingleThreadedSpinner s;
00543 spin(s);
00544 }
00545
00546 void spin(Spinner& s)
00547 {
00548 s.spin();
00549 }
00550
00551 void spinOnce()
00552 {
00553 g_global_queue->callAvailable(ros::WallDuration());
00554 }
00555
00556 void waitForShutdown()
00557 {
00558 while (ok())
00559 {
00560 WallDuration(0.05).sleep();
00561 }
00562 }
00563
00564 CallbackQueue* getGlobalCallbackQueue()
00565 {
00566 return g_global_queue.get();
00567 }
00568
00569 bool ok()
00570 {
00571 return g_ok;
00572 }
00573
00574 void shutdown()
00575 {
00576 boost::recursive_mutex::scoped_lock lock(g_shutting_down_mutex);
00577 if (g_shutting_down)
00578 return;
00579 else
00580 g_shutting_down = true;
00581
00582 ros::console::shutdown();
00583
00584 g_global_queue->disable();
00585 g_global_queue->clear();
00586
00587 if (g_internal_queue_thread.get_id() != boost::this_thread::get_id())
00588 {
00589 g_internal_queue_thread.join();
00590 }
00591
00592 g_rosout_appender = 0;
00593
00594 if (g_started)
00595 {
00596 TopicManager::instance()->shutdown();
00597 ServiceManager::instance()->shutdown();
00598 PollManager::instance()->shutdown();
00599 ConnectionManager::instance()->shutdown();
00600 XMLRPCManager::instance()->shutdown();
00601 }
00602
00603 WallTime end = WallTime::now();
00604
00605 g_started = false;
00606 g_ok = false;
00607 Time::shutdown();
00608 }
00609
00610 }