Go to the documentation of this file.00001
00023 #include "micros_swarm/app_manager.h"
00024
00025 namespace micros_swarm{
00026
00027 Worker::Worker(int id)
00028 {
00029 id_ = id;
00030 thread_ = new boost::thread(&Worker::workFunc, this);
00031 run_ = true;
00032 apps_.clear();
00033 }
00034
00035 Worker::~Worker()
00036 {
00037 run_ = false;
00038 for(int i = 0; i < apps_.size(); i++) {
00039 apps_[i]->app_ptr_.reset();
00040 delete apps_[i];
00041 }
00042 apps_.clear();
00043 thread_->interrupt();
00044 thread_->join();
00045 delete thread_;
00046 }
00047
00048 void Worker::addApp(AppInstance* app)
00049 {
00050 apps_.push_back(app);
00051 }
00052
00053 void Worker::removeApp(const std::string& app_name)
00054 {
00055 std::vector<AppInstance*>::iterator app_it;
00056 for(app_it = apps_.begin(); app_it != apps_.end(); app_it++) {
00057 if((*app_it)->app_name_ == app_name) {
00058
00059 (*app_it)->running_ = false;
00060 (*app_it)->app_ptr_.reset();
00061 delete (*app_it);
00062 app_it = apps_.erase(app_it);
00063 return;
00064 }
00065 }
00066 }
00067
00068 int Worker::getAppNum()
00069 {
00070 return apps_.size();
00071 }
00072
00073 AppInstance* Worker::getAppInstance(const std::string& app_name)
00074 {
00075 std::vector<AppInstance*>::iterator app_it;
00076 for(app_it = apps_.begin(); app_it != apps_.end(); app_it++) {
00077 if((*app_it)->app_name_ == app_name) {
00078 return (*app_it);
00079 }
00080 }
00081
00082 AppInstance *app_ins = NULL;
00083 return app_ins;
00084 }
00085
00086 void Worker::workFunc()
00087 {
00088 while(run_) {
00089 std::vector<AppInstance*>::iterator app_it;
00090 for(app_it = apps_.begin(); app_it != apps_.end(); app_it++) {
00091 if(!(*app_it)->running_) {
00092 (*app_it)->app_ptr_->init();
00093 (*app_it)->app_ptr_->start();
00094 (*app_it)->running_ = true;
00095 }
00096 }
00097 ros::Duration(0.2).sleep();
00098 }
00099 }
00100
00101 AppManager::AppManager():app_loader_("micros_swarm", "micros_swarm::Application")
00102 {
00103 ros::NodeHandle nh;
00104 app_load_srv_ = nh.advertiseService("app_loader_load_app", &AppManager::loadService, this);
00105 app_unload_srv_ = nh.advertiseService("app_loader_unload_app", &AppManager::unloadService, this);
00106 worker_num_ = 4;
00107 worker_table_.clear();
00108 load_table_.clear();
00109 for(int i = 0; i < worker_num_; i++) {
00110 Worker *worker = new Worker(i);
00111 worker_table_.push_back(worker);
00112 load_table_.push_back(0);
00113 }
00114 apps_record_.clear();
00115 plugin_use_count_.clear();
00116 }
00117
00118 AppManager::AppManager(int worker_num):app_loader_("micros_swarm", "micros_swarm::Application")
00119 {
00120 ros::NodeHandle nh;
00121 app_load_srv_ = nh.advertiseService("app_loader_load_app", &AppManager::loadService, this);
00122 app_unload_srv_ = nh.advertiseService("app_loader_unload_app", &AppManager::unloadService, this);
00123 worker_num_ = worker_num;
00124 worker_table_.clear();
00125 load_table_.clear();
00126 for(int i = 0; i < worker_num_; i++) {
00127 Worker *worker = new Worker(i);
00128 worker_table_.push_back(worker);
00129 load_table_.push_back(0);
00130 }
00131 apps_record_.clear();
00132 plugin_use_count_.clear();
00133 }
00134
00135 void AppManager::stop()
00136 {
00137 ros::NodeHandle nh;
00138 std::map<std::string, int>::iterator it;
00139 for(it = apps_record_.begin(); it != apps_record_.end(); it++) {
00140 std::string topic_name = "runtime_core_destroy_" + it->first;
00141 ros::ServiceClient client = nh.serviceClient<app_loader::RTDestroy>(topic_name);
00142 app_loader::RTDestroy srv;
00143 srv.request.code = 1;
00144
00145 if (client.call(srv)) {
00146
00147 }
00148 else {
00149
00150 }
00151 ros::Duration(0.1).sleep();
00152 }
00153
00154 for(it = apps_record_.begin(); it != apps_record_.end(); ) {
00155 int worker_id = it->second;
00156 AppInstance *instance = worker_table_[worker_id]->getAppInstance(it->first);
00157 if(instance != NULL) {
00158 instance->app_ptr_->stop();
00159 instance->app_ptr_.reset();
00160 std::string app_type = instance->app_type_;
00161 worker_table_[worker_id]->removeApp(it->first);
00162 load_table_[worker_id] -= 1;
00163 decreasePluginUseCount(app_type);
00164 if(getPluginUseCount(app_type) == 0) {
00165 app_loader_.unloadLibraryForClass(app_type);
00166 }
00167 }
00168
00169 apps_record_.erase(it++);
00170 }
00171 std::cout<<"[AppManager]: all Apps were unloaded successfully."<<std::endl;
00172
00173 for(int i = 0; i < worker_table_.size(); i++) {
00174 delete worker_table_[i];
00175 }
00176 worker_table_.clear();
00177 std::cout<<"[AppManager]: destroy all the worker successfully."<<std::endl;
00178 }
00179
00180 AppManager::~AppManager() {}
00181
00182 bool AppManager::recordExist(const std::string& name)
00183 {
00184 std::map<std::string, int>::iterator it = apps_record_.find(name);
00185
00186 if(it != apps_record_.end()) {
00187 return true;
00188 }
00189
00190 return false;
00191 }
00192
00193 void AppManager::addRecord(const std::string& name, int worker_id)
00194 {
00195 apps_record_.insert(std::pair<std::string, int>(name, worker_id));
00196 }
00197
00198 void AppManager::removeRecord(const std::string& name)
00199 {
00200 apps_record_.erase(name);
00201 }
00202
00203 int AppManager::allocateWorker()
00204 {
00205 int min_index = 0;
00206 for(int i = 1; i < worker_num_; i++) {
00207 if(load_table_[i] < load_table_[min_index]) {
00208 min_index = i;
00209 }
00210 }
00211
00212 return min_index;
00213 }
00214
00215 void AppManager::insertOrUpdatePluginUseCount(const std::string& type)
00216 {
00217 std::map<std::string, int>::iterator it;
00218 it = plugin_use_count_.find(type);
00219 if(it != plugin_use_count_.end()) {
00220 it->second += 1;
00221 }
00222 else {
00223 plugin_use_count_.insert(std::pair<std::string, int>(type, 1));
00224 }
00225 }
00226
00227 void AppManager::decreasePluginUseCount(const std::string& type)
00228 {
00229 std::map<std::string, int>::iterator it;
00230 it = plugin_use_count_.find(type);
00231 if(it != plugin_use_count_.end()) {
00232 it->second -= 1;
00233 if(it->second == 0) {
00234 plugin_use_count_.erase(type);
00235 }
00236 }
00237 }
00238
00239 int AppManager::getPluginUseCount(const std::string& type)
00240 {
00241 std::map<std::string, int>::iterator it;
00242 it = plugin_use_count_.find(type);
00243 if(it != plugin_use_count_.end()) {
00244 return plugin_use_count_[type];
00245 }
00246
00247 return 0;
00248 }
00249
00250 bool AppManager::loadService(app_loader::AppLoad::Request &req, app_loader::AppLoad::Response &resp)
00251 {
00252 std::string app_name = req.name;
00253 std::string app_type = req.type;
00254
00255 bool app_exist = recordExist(app_name);
00256 if(app_exist) {
00257 ROS_WARN("[AppManager]: App %s was already existed.", app_name.c_str());
00258 ros::Duration(0.1).sleep();
00259 resp.success = false;
00260 return false;
00261 }
00262 else {
00263 boost::shared_ptr<micros_swarm::Application> app;
00264 try {
00265 app = app_loader_.createInstance(app_type);
00266 }
00267 catch(pluginlib::PluginlibException& ex) {
00268 ROS_ERROR("[AppManager]: App %s failed to load for some reason. Error: %s", app_name.c_str(), ex.what());
00269 }
00270
00271 AppInstance* app_instance = new AppInstance();
00272 app_instance->app_name_ = app_name;
00273 app_instance->app_type_ = app_type;
00274 app_instance->app_ptr_ = app;
00275 app_instance->running_ = false;
00276
00277 int worker_index = allocateWorker();
00278 worker_table_[worker_index]->addApp(app_instance);
00279 load_table_[worker_index] += 1;
00280 addRecord(app_name, worker_index);
00281 insertOrUpdatePluginUseCount(app_type);
00282 ROS_INFO("[AppManager]: App %s was loaded successfully.", app_name.c_str());
00283 ros::Duration(0.1).sleep();
00284 resp.success = true;
00285 return true;
00286 }
00287 }
00288
00289 bool AppManager::unloadService(app_loader::AppUnload::Request &req, app_loader::AppUnload::Response &resp)
00290 {
00291 std::string app_name = req.name;
00292 std::string app_type = req.type;
00293
00294 if(recordExist(app_name)) {
00295 int worker_id = apps_record_[app_name];
00296 AppInstance *instance = worker_table_[worker_id]->getAppInstance(app_name);
00297 if(instance == NULL) {
00298 ROS_WARN("[AppManager]: Get App instance %s failed.", app_name.c_str());
00299 ros::Duration(0.1).sleep();
00300 resp.success = false;
00301 return false;
00302 }
00303 instance->app_ptr_.reset();
00304 worker_table_[worker_id]->removeApp(app_name);
00305 load_table_[worker_id] -= 1;
00306 removeRecord(app_name);
00307 decreasePluginUseCount(app_type);
00308 if(getPluginUseCount(app_type) == 0) {
00309 app_loader_.unloadLibraryForClass(app_type);
00310 }
00311 ROS_INFO("[AppManager]: App %s was unloaded successfully.", app_name.c_str());
00312 ros::Duration(0.1).sleep();
00313 resp.success = true;
00314 return true;
00315 }
00316 ROS_WARN("[AppManager]: App %s does not exist.", app_name.c_str());
00317 ros::Duration(0.1).sleep();
00318 resp.success = false;
00319 return false;
00320 }
00321 };