message_collection_helper.cpp
Go to the documentation of this file.
1 // SPDX-License-Identifier: BSD-3-Clause
2 
3 /*
4  * Copyright (c) 2020, Bjarne von Horn
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  * * Redistributions of source code must retain the above copyright notice,
10  * this list of conditions and the following disclaimer.
11  * * Redistributions in binary form must reproduce the above copyright notice,
12  * this list of conditions and the following disclaimer in the documentation
13  * and/or other materials provided with the distribution.
14  * * Neither the name of the copyright holder nor the names of its contributors
15  * may be used to endorse or promote products derived from this software
16  * without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
20  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL BJARNE VON HORN BE LIABLE FOR ANY DIRECT,
22  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
25  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
30 
36 
37 #include <boost/make_shared.hpp>
38 #include <sqlite3.h>
39 #include <sstream>
40 #include <cstring>
41 #include <ros/console.h>
42 
44 warehouse_ros_sqlite::MessageCollectionHelper::findAndMatchMd5Sum(const std::array<unsigned char, 16>& md5_bytes)
45 {
46  sqlite3_stmt* stmt = nullptr;
47  std::ostringstream query_builder;
48  query_builder << "SELECT " << schema::M_D5_TABLE_M_D5_COLUMN << " FROM " << schema::M_D5_TABLE_NAME << " WHERE "
49  << schema::M_D5_TABLE_INDEX_COLUMN << " == ? ;";
50  const auto query = query_builder.str();
51  if (sqlite3_prepare_v2(db_.get(), query.c_str(), query.size() + 1, &stmt, nullptr) != SQLITE_OK)
52  {
53  throw InternalError("Prepare statement for findAndMatchMd5Sum() failed", db_.get());
54  }
55  sqlite3_stmt_ptr stmt_ptr(stmt);
56  if (sqlite3_bind_text(stmt, 1, mangled_tablename_.c_str(), mangled_tablename_.size(), SQLITE_STATIC) != SQLITE_OK)
57  {
58  throw InternalError("Bind parameter for findAndMatchMd5Sum() failed", db_.get());
59  }
60  switch (sqlite3_step(stmt))
61  {
62  case SQLITE_DONE:
64  case SQLITE_ROW:
65  break;
66  default:
67  throw InternalError("Fetch result for findAndMatchMd5Sum() failed", db_.get());
68  }
69 
70  if (std::size_t(sqlite3_column_bytes(stmt, 0)) != md5_bytes.size())
71  {
72  throw std::invalid_argument("invalid md5 value");
73  }
74  if (std::memcmp(&md5_bytes[0], sqlite3_column_blob(stmt, 0), md5_bytes.size()) == 0)
76  else
78 }
79 
80 bool warehouse_ros_sqlite::MessageCollectionHelper::initialize(const std::string& datatype, const std::string& md5)
81 {
82  using namespace warehouse_ros_sqlite::schema;
83  const auto md5_bytes = warehouse_ros_sqlite::parse_md5_hexstring(md5);
84  const auto current_md5_state = findAndMatchMd5Sum(md5_bytes);
85  if (current_md5_state != Md5CompareResult::EMPTY)
86  {
87  return current_md5_state == Md5CompareResult::MATCH;
88  }
89 
90  std::ostringstream query_builder;
92  query_builder << "BEGIN TRANSACTION; CREATE TABLE " << escaped_mangled_name_ << "(" << schema::DATA_COLUMN_NAME
93  << " BLOB NOT NULL, " << schema::METADATA_COLUMN_PREFIX << "id INTEGER PRIMARY KEY AUTOINCREMENT, "
94  << schema::METADATA_COLUMN_PREFIX << "creation_time INTEGER)"
95  << "; INSERT INTO " << M_D5_TABLE_NAME << " ( " << M_D5_TABLE_INDEX_COLUMN << " , "
97  << " , " << M_D5_TABLE_DATATYPE_COLUMN << ") VALUES ('" << esc(mangled_tablename_) << "', '"
98  << esc(collection_name_) << "', '" << esc(db_name_) << "' , x'" << md5 << "' , '" << esc(datatype)
99  << "'); COMMIT TRANSACTION;";
100  const auto query = query_builder.str();
101  ROS_DEBUG_NAMED("warehouse_ros_sqlite", "initialize query: %s", query.c_str());
102  if (sqlite3_exec(db_.get(), query.c_str(), nullptr, nullptr, nullptr) != SQLITE_OK)
103  {
104  ROS_ERROR_NAMED("warehouse_ros_sqlite", "Database initialization failed: %s", sqlite3_errmsg(db_.get()));
105  sqlite3_exec(db_.get(), "ROLLBACK;", nullptr, nullptr, nullptr);
106  return false;
107  }
108  return true;
109 }
110 
113 {
114  auto meta = reinterpret_cast<const warehouse_ros_sqlite::Metadata*>(metadata.get());
115  if (!meta || !msg || !msg_size)
116  throw std::invalid_argument("meta, msg or msg_size is 0");
117  meta->ensureColumns(db_.get(), mangled_tablename_);
118  std::ostringstream query;
119  query << "INSERT INTO " << escaped_mangled_name_ << " (" << schema::DATA_COLUMN_NAME;
120 
121  const auto& data = meta->data();
122  for (const auto& kv : data)
123  {
124  query << ", " << schema::escape_columnname_with_prefix(std::get<0>(kv));
125  }
126  query << ") VALUES ( ? ";
127  for (size_t i = 0; i < data.size(); ++i)
128  query << ", ? ";
129  query << ");";
130 
131  sqlite3_stmt* stmt = nullptr;
132  const auto query_str = query.str();
133  ROS_DEBUG_NAMED("warehouse_ros_sqlite", "insert query: %s", query_str.c_str());
134  if (sqlite3_prepare_v2(db_.get(), query_str.c_str(), query_str.size() + 1, &stmt, nullptr) != SQLITE_OK)
135  throw InternalError("Prepare statement for insert() failed", db_.get());
136  const sqlite3_stmt_ptr stmt_guard(stmt);
137 
138  if (sqlite3_bind_blob(stmt, 1, msg, msg_size, SQLITE_STATIC) != SQLITE_OK)
139  throw InternalError("Bind parameter for insert() failed", db_.get());
140  warehouse_ros_sqlite::BindVisitor visitor(stmt, 2);
141  for (const auto& kv : data)
142  {
143  if (boost::apply_visitor(visitor, std::get<1>(kv)) != SQLITE_OK)
144  throw InternalError("Bind parameter for insert() failed", db_.get());
145  }
146 
147  assert(sqlite3_bind_parameter_count(stmt) == visitor.getTotalBinds());
148  if (sqlite3_step(stmt) != SQLITE_DONE)
149  throw InternalError("insert() failed", db_.get());
150 }
151 
154  bool ascending) const
155 {
156  std::string outro;
157  if (!sort_by.empty())
158  {
159  outro += " ORDER BY " + schema::escape_columnname_with_prefix(sort_by) + (ascending ? " ASC" : " DESC");
160  }
161  auto query_ptr = dynamic_cast<const warehouse_ros_sqlite::Query*>(query.get());
162  assert(query_ptr);
163  std::ostringstream intro;
164  intro << "SELECT * FROM " << escaped_mangled_name_;
165  if (!query_ptr->empty())
166  {
167  intro << " WHERE ";
168  }
169  auto stmt = query_ptr->prepare(db_.get(), intro.str(), outro);
170  if (stmt)
171  {
172  switch (sqlite3_step(stmt.get()))
173  {
174  case SQLITE_DONE:
175  case SQLITE_ROW:
176  break;
177  default:
178  throw InternalError("query() failed", db_.get());
179  }
180  }
181  return boost::make_shared<warehouse_ros_sqlite::ResultIteratorHelper>(std::move(stmt));
182 }
183 
185 {
186  auto pquery = dynamic_cast<warehouse_ros_sqlite::Query const*>(query.get());
187  if (!pquery)
188  throw std::invalid_argument("Query was not initialized by createQuery()");
189  auto stmt = pquery->prepare(db_.get(), "DELETE FROM " + escaped_mangled_name_ + " WHERE ");
190  if (sqlite3_step(stmt.get()) != SQLITE_DONE)
191  {
192  throw InternalError("Prepare statement for removeMessages() failed", db_.get());
193  }
194  return sqlite3_changes(db_.get());
195 }
196 
197 namespace
198 {
199 template <typename It>
200 void comma_concat_meta_column_names(std::ostringstream& buf, It it, It end)
201 {
203  if (it == end)
204  return;
205  buf << escape_columnname_with_prefix(it->first);
206  it++;
207  while (it != end)
208  {
209  buf << " = ?, " << escape_columnname_with_prefix(it->first);
210  it++;
211  }
212  buf << " = ?";
213 }
214 } // namespace
215 
218 {
219  auto query = dynamic_cast<const warehouse_ros_sqlite::Query*>(q.get());
220  auto metadata = dynamic_cast<const warehouse_ros_sqlite::Metadata*>(m.get());
221  if (!query || !metadata)
222  throw std::invalid_argument("q or m not created by createQuery() or createMetadata()");
223  metadata->ensureColumns(db_.get(), mangled_tablename_);
224  const int mt_count = metadata->data().size();
225  if (mt_count == 0)
226  return;
227 
228  std::ostringstream query_builder;
229  query_builder << "UPDATE " << escaped_mangled_name_ << " SET ";
230 
231  comma_concat_meta_column_names(query_builder, metadata->data().begin(), metadata->data().end());
232  query_builder << " WHERE ";
233  auto stmt = query->prepare(db_.get(), query_builder.str(), "", mt_count + 1);
234  if (!stmt)
235  throw InternalError("modifyMetadata() failed", db_.get());
236  warehouse_ros_sqlite::BindVisitor visitor(stmt.get(), 1);
237  for (const auto& kv : metadata->data())
238  {
239  if (boost::apply_visitor(visitor, std::get<1>(kv)) != SQLITE_OK)
240  throw InternalError("Bind parameter failed for modifyMetadata()", db_.get());
241  }
242 
243  if (sqlite3_step(stmt.get()) != SQLITE_DONE)
244  throw InternalError("modifyMetadata() failed", db_.get());
245 }
246 
248 {
249  const std::string query = "SELECT COUNT(*) FROM " + escaped_mangled_name_ + ";";
250  sqlite3_stmt* stmt = nullptr;
251  if (sqlite3_prepare_v2(db_.get(), query.c_str(), query.size() + 1, &stmt, nullptr) != SQLITE_OK)
252  throw InternalError("Prepare statement for count() failed", db_.get());
253  const warehouse_ros_sqlite::sqlite3_stmt_ptr stmt_guard(stmt);
254 
255  if (sqlite3_step(stmt) != SQLITE_ROW)
256  throw InternalError("count() failed", db_.get());
257 
258  assert(sqlite3_column_count(stmt) == 1);
259 
260  return sqlite3_column_int(stmt, 0);
261 }
263 {
264  return boost::make_shared<warehouse_ros_sqlite::Query>();
265 }
267 {
268  return boost::make_shared<warehouse_ros_sqlite::Metadata>();
269 }
void modifyMetadata(warehouse_ros::Query::ConstPtr q, warehouse_ros::Metadata::ConstPtr m) override
unsigned removeMessages(warehouse_ros::Query::ConstPtr query) override
WAREHOUSE_ROS_SQLITE_EXPORT std::array< unsigned char, 16 > parse_md5_hexstring(const std::string &md5)
Definition: utils.h:127
constexpr const char * DATA_COLUMN_NAME
Definition: utils.h:85
constexpr const char * M_D5_TABLE_DATABASE_COLUMN
Definition: utils.h:92
constexpr const char * M_D5_TABLE_NAME
Definition: utils.h:87
warehouse_ros::Metadata::Ptr createMetadata() const override
std::string escape_string_literal_without_quotes(const std::string &c)
Definition: utils.h:106
Md5CompareResult findAndMatchMd5Sum(const std::array< unsigned char, 16 > &md5_bytes)
constexpr const char * M_D5_TABLE_TABLE_COLUMN
Definition: utils.h:91
#define ROS_DEBUG_NAMED(name,...)
warehouse_ros::ResultIteratorHelper::Ptr query(warehouse_ros::Query::ConstPtr query, const std::string &sort_by="", bool ascending=true) const override
constexpr const char * M_D5_TABLE_INDEX_COLUMN
Definition: utils.h:88
constexpr const char * M_D5_TABLE_M_D5_COLUMN
Definition: utils.h:89
escaped_columnname escape_columnname_with_prefix(const std::string &c)
Definition: utils.h:102
void insert(char *msg, size_t msg_size, warehouse_ros::Metadata::ConstPtr metadata) override
bool initialize(const std::string &datatype, const std::string &md5) override
std::unique_ptr< sqlite3_stmt, Sqlite3StmtDeleter > sqlite3_stmt_ptr
Definition: utils.h:50
constexpr const char * M_D5_TABLE_DATATYPE_COLUMN
Definition: utils.h:90
#define ROS_ERROR_NAMED(name,...)
constexpr const char * METADATA_COLUMN_PREFIX
Definition: utils.h:84
warehouse_ros::Query::Ptr createQuery() const override


warehouse_ros_sqlite
Author(s): Bjarne von Horn
autogenerated on Fri Nov 11 2022 03:44:33