json_plugin_router.cpp
Go to the documentation of this file.
1 //
2 // Created by nakakura on 22/08/25.
3 //
4 
5 #include "json_plugin_router.h"
6 
7 //===== private =====
8 void JsonPluginRouter::observe_socket(std::vector<uint8_t> data) {
9  std::string message(data.begin(), data.end());
10  std::shared_ptr<rapidjson::Document> doc(new rapidjson::Document);
11  doc->Parse(message.c_str());
12 
13  if (doc->HasParseError()) {
14  ROS_ERROR("invalid json message: %s", message.c_str());
15  ROS_ERROR("error location: %ld", doc->GetErrorOffset());
16  ROS_ERROR("error mesasge: %s",
17  rapidjson::GetParseError_En(doc->GetParseError()));
18  return;
19  }
20 
21  for (auto plugin = plugins_.rbegin(); plugin != plugins_.rend(); ++plugin) {
22  (*plugin)->Execute(doc);
23  }
24 }
25 
27  std::shared_ptr<rapidjson::Document> doc) {
28  StringBuffer buffer;
29  Writer<StringBuffer> writer(buffer);
30  doc->Accept(writer);
31  std::string s(buffer.GetString(), buffer.GetSize());
32  std::vector<uint8_t> data(s.begin(), s.end());
33  socket_->SendData(data);
34 }
35 
36 //===== public =====
37 JsonPluginRouter::JsonPluginRouter(std::shared_ptr<rapidjson::Document> config,
38  udp::endpoint target_socket,
39  SocketFactory factory)
40  : plugin_loader_("skyway", "skyway_plugin::SkyWayJsonPlugin"),
41  config_(std::move(config)),
42  target_socket_(target_socket) {
43  // Socketからのcallbackを与えてSocketを生成`
44  socket_ = factory(
46  std::make_shared<std::function<void(std::vector<uint8_t>)>>(std::bind(
47  &JsonPluginRouter::observe_socket, this, std::placeholders::_1)));
48 }
49 
51  if (socket_) socket_->Stop();
52  for (auto plugin = plugins_.rbegin(); plugin != plugins_.rend(); ++plugin) {
53  (*plugin)->Shutdown();
54  plugin->reset();
55  }
56 
57  ROS_DEBUG("Succeeded in unloading all plugins");
58 }
59 
61  // plugin情報の配列を与えられていない場合は開始できない
62  if (!config_->IsArray()) {
63  return {false, 0, "invalid config parameters"};
64  }
65 
66  auto callback = std::make_shared<
67  std::function<void(std::shared_ptr<rapidjson::Document>)>>(std::bind(
68  &JsonPluginRouter::observe_plugins, this, std::placeholders::_1));
69 
70  for (rapidjson::Value::ConstValueIterator itr = config_->Begin();
71  itr != config_->End(); ++itr) {
72  if (!itr->HasMember("plugin_name")) continue;
73  std::string plugin_name = (*itr)["plugin_name"].GetString();
74 
75  try {
77  plugin_loader_.createInstance(plugin_name);
78  std::shared_ptr<rapidjson::Document> parameter(new rapidjson::Document);
79  parameter->CopyFrom(*itr, parameter->GetAllocator());
80  plugin->Initialize(std::move(parameter), callback);
81  plugins_.push_back(plugin);
82  ROS_DEBUG("plugin %s has been loaded successfully", plugin_name.c_str());
83  } catch (pluginlib::PluginlibException& ex) {
84  // pluginがopenできなかったらここでreturnする
85  std::ostringstream stream;
86  stream << "Failed to load " << plugin_name << ex.what();
87  std::string message = stream.str();
88  char* data = (char*)malloc(strlen(message.c_str()) + 1);
89  ROS_ERROR("plugin load error: %s", message.c_str());
90  strcpy(data, message.c_str());
91 
92  return {false, 0, (const char*)data};
93  }
94  }
95 
96  // ここでsocket startするとデータが流れ始める
97  socket_->Start();
98 
99  return {true, socket_->Port(), ""};
100 }
101 
102 uint16_t JsonPluginRouter::Port() { return socket_->Port(); }
103 
104 Component<fruit::Annotated<JsonAnnotation, PluginRouterFactory>>
106  return createComponent()
107  .bind<fruit::Annotated<JsonAnnotation, PluginRouter>, JsonPluginRouter>()
108  .install(getUdpSocketComponent);
109 }
boost::shared_ptr< T > createInstance(const std::string &lookup_name)
Component< fruit::Annotated< JsonAnnotation, PluginRouterFactory > > getJsonPluginRouterComponent()
std::vector< boost::shared_ptr< skyway_plugin::SkyWayJsonPlugin > > plugins_
XmlRpcServer s
std::function< std::unique_ptr< Socket >(udp::endpoint, std::shared_ptr< std::function< void(std::vector< uint8_t >)> >)> SocketFactory
Definition: udp_socket.h:69
void observe_socket(std::vector< uint8_t > data)
pluginlib::ClassLoader< skyway_plugin::SkyWayJsonPlugin > plugin_loader_
std::unique_ptr< Socket > socket_
std::shared_ptr< rapidjson::Document > config_
void observe_plugins(std::shared_ptr< rapidjson::Document > document)
virtual PluginResult TryStart() override
Component< SocketFactory > getUdpSocketComponent()
Definition: udp_socket.cpp:87
udp::endpoint target_socket_
virtual uint16_t Port() override
#define ROS_ERROR(...)
#define ROS_DEBUG(...)


skyway
Author(s): Toshiya Nakakura
autogenerated on Sat Apr 15 2023 02:08:21