37 #include <boost/make_shared.hpp> 46 sqlite3_stmt* stmt =
nullptr;
47 std::ostringstream query_builder;
50 const auto query = query_builder.str();
51 if (sqlite3_prepare_v2(
db_.get(),
query.c_str(),
query.size() + 1, &stmt,
nullptr) != SQLITE_OK)
53 throw InternalError(
"Prepare statement for findAndMatchMd5Sum() failed",
db_.get());
58 throw InternalError(
"Bind parameter for findAndMatchMd5Sum() failed",
db_.get());
60 switch (sqlite3_step(stmt))
67 throw InternalError(
"Fetch result for findAndMatchMd5Sum() failed",
db_.get());
70 if (std::size_t(sqlite3_column_bytes(stmt, 0)) != md5_bytes.size())
72 throw std::invalid_argument(
"invalid md5 value");
74 if (std::memcmp(&md5_bytes[0], sqlite3_column_blob(stmt, 0), md5_bytes.size()) == 0)
90 std::ostringstream query_builder;
99 <<
"'); COMMIT TRANSACTION;";
100 const auto query = query_builder.str();
102 if (sqlite3_exec(
db_.get(),
query.c_str(),
nullptr,
nullptr,
nullptr) != SQLITE_OK)
104 ROS_ERROR_NAMED(
"warehouse_ros_sqlite",
"Database initialization failed: %s", sqlite3_errmsg(
db_.get()));
105 sqlite3_exec(
db_.get(),
"ROLLBACK;",
nullptr,
nullptr,
nullptr);
115 if (!meta || !msg || !msg_size)
116 throw std::invalid_argument(
"meta, msg or msg_size is 0");
118 std::ostringstream
query;
121 const auto& data = meta->data();
122 for (
const auto& kv : data)
126 query <<
") VALUES ( ? ";
127 for (
size_t i = 0; i < data.size(); ++i)
131 sqlite3_stmt* stmt =
nullptr;
132 const auto query_str = query.str();
133 ROS_DEBUG_NAMED(
"warehouse_ros_sqlite",
"insert query: %s", query_str.c_str());
134 if (sqlite3_prepare_v2(
db_.get(), query_str.c_str(), query_str.size() + 1, &stmt,
nullptr) != SQLITE_OK)
138 if (sqlite3_bind_blob(stmt, 1, msg, msg_size, SQLITE_STATIC) != SQLITE_OK)
141 for (
const auto& kv : data)
143 if (boost::apply_visitor(visitor, std::get<1>(kv)) != SQLITE_OK)
147 assert(sqlite3_bind_parameter_count(stmt) == visitor.getTotalBinds());
148 if (sqlite3_step(stmt) != SQLITE_DONE)
154 bool ascending)
const 157 if (!sort_by.empty())
163 std::ostringstream intro;
165 if (!query_ptr->empty())
169 auto stmt = query_ptr->prepare(
db_.get(), intro.str(), outro);
172 switch (sqlite3_step(stmt.get()))
181 return boost::make_shared<warehouse_ros_sqlite::ResultIteratorHelper>(std::move(stmt));
188 throw std::invalid_argument(
"Query was not initialized by createQuery()");
190 if (sqlite3_step(stmt.get()) != SQLITE_DONE)
192 throw InternalError(
"Prepare statement for removeMessages() failed",
db_.get());
194 return sqlite3_changes(
db_.get());
199 template <
typename It>
200 void comma_concat_meta_column_names(std::ostringstream& buf, It it, It end)
221 if (!
query || !metadata)
222 throw std::invalid_argument(
"q or m not created by createQuery() or createMetadata()");
224 const int mt_count = metadata->data().size();
228 std::ostringstream query_builder;
231 comma_concat_meta_column_names(query_builder, metadata->data().begin(), metadata->data().end());
232 query_builder <<
" WHERE ";
233 auto stmt =
query->prepare(
db_.get(), query_builder.str(),
"", mt_count + 1);
237 for (
const auto& kv : metadata->data())
239 if (boost::apply_visitor(visitor, std::get<1>(kv)) != SQLITE_OK)
240 throw InternalError(
"Bind parameter failed for modifyMetadata()",
db_.get());
243 if (sqlite3_step(stmt.get()) != SQLITE_DONE)
250 sqlite3_stmt* stmt =
nullptr;
251 if (sqlite3_prepare_v2(
db_.get(), query.c_str(), query.size() + 1, &stmt,
nullptr) != SQLITE_OK)
255 if (sqlite3_step(stmt) != SQLITE_ROW)
258 assert(sqlite3_column_count(stmt) == 1);
260 return sqlite3_column_int(stmt, 0);
264 return boost::make_shared<warehouse_ros_sqlite::Query>();
268 return boost::make_shared<warehouse_ros_sqlite::Metadata>();
void modifyMetadata(warehouse_ros::Query::ConstPtr q, warehouse_ros::Metadata::ConstPtr m) override
std::string collection_name_
unsigned removeMessages(warehouse_ros::Query::ConstPtr query) override
WAREHOUSE_ROS_SQLITE_EXPORT std::array< unsigned char, 16 > parse_md5_hexstring(const std::string &md5)
constexpr const char * DATA_COLUMN_NAME
constexpr const char * M_D5_TABLE_DATABASE_COLUMN
constexpr const char * M_D5_TABLE_NAME
warehouse_ros::Metadata::Ptr createMetadata() const override
unsigned count() override
std::string escape_string_literal_without_quotes(const std::string &c)
Md5CompareResult findAndMatchMd5Sum(const std::array< unsigned char, 16 > &md5_bytes)
constexpr const char * M_D5_TABLE_TABLE_COLUMN
#define ROS_DEBUG_NAMED(name,...)
warehouse_ros::ResultIteratorHelper::Ptr query(warehouse_ros::Query::ConstPtr query, const std::string &sort_by="", bool ascending=true) const override
constexpr const char * M_D5_TABLE_INDEX_COLUMN
schema::escaped_tablename escaped_mangled_name_
constexpr const char * M_D5_TABLE_M_D5_COLUMN
escaped_columnname escape_columnname_with_prefix(const std::string &c)
void insert(char *msg, size_t msg_size, warehouse_ros::Metadata::ConstPtr metadata) override
bool initialize(const std::string &datatype, const std::string &md5) override
std::unique_ptr< sqlite3_stmt, Sqlite3StmtDeleter > sqlite3_stmt_ptr
std::string mangled_tablename_
constexpr const char * M_D5_TABLE_DATATYPE_COLUMN
#define ROS_ERROR_NAMED(name,...)
constexpr const char * METADATA_COLUMN_PREFIX
warehouse_ros::Query::Ptr createQuery() const override