mongodb_log_cimg.cpp
Go to the documentation of this file.
1 
2 /***************************************************************************
3  * mongodb_log_cimg.cpp - MongoDB Logger for compressed images
4  *
5  * Created: Wed Dec 15 17:59:25 2010
6  * Copyright 2010 Tim Niemueller [www.niemueller.de]
7  * 2010 Carnegie Mellon University
8  * 2010 Intel Labs Pittsburgh
9  ****************************************************************************/
10 
11 /* This program is free software; you can redistribute it and/or modify
12  * it under the terms of the GNU General Public License as published by
13  * the Free Software Foundation; either version 2 of the License, or
14  * (at your option) any later version. A runtime exception applies to
15  * this software (see LICENSE.GPL_WRE file mentioned below for details).
16  *
17  * This program is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20  * GNU Library General Public License for more details.
21  *
22  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
23  */
24 
25 #include <ros/ros.h>
26 #include <mongo/client/dbclient.h>
27 #include <mongodb_store/util.h>
28 
29 #include <sensor_msgs/CompressedImage.h>
30 
31 using namespace mongo;
32 
33 DBClientConnection *mongodb_conn;
34 std::string collection;
35 
36 unsigned int in_counter;
37 unsigned int out_counter;
38 unsigned int qsize;
39 unsigned int drop_counter;
40 
41 static pthread_mutex_t in_counter_mutex = PTHREAD_MUTEX_INITIALIZER;
42 static pthread_mutex_t out_counter_mutex = PTHREAD_MUTEX_INITIALIZER;
43 static pthread_mutex_t drop_counter_mutex = PTHREAD_MUTEX_INITIALIZER;
44 static pthread_mutex_t qsize_mutex = PTHREAD_MUTEX_INITIALIZER;
45 
46 void msg_callback(const sensor_msgs::CompressedImage::ConstPtr& msg)
47 {
48  BSONObjBuilder document;
49 
50  Date_t stamp = msg->header.stamp.sec * 1000.0 + msg->header.stamp.nsec / 1000000.0;
51  document.append("header", BSON( "seq" << msg->header.seq
52  << "stamp" << stamp
53  << "frame_id" << msg->header.frame_id));
54  document.append("format", msg->format);
55  document.appendBinData("data", msg->data.size(), BinDataGeneral,
56  const_cast<unsigned char*>(&msg->data[0]));
57 
58  mongodb_store::add_meta_for_msg<sensor_msgs::CompressedImage>(msg, document);
59  mongodb_conn->insert(collection, document.obj());
60 
61  // If we'd get access to the message queue this could be more useful
62  // https://code.ros.org/trac/ros/ticket/744
63  pthread_mutex_lock(&in_counter_mutex);
64  ++in_counter;
65  pthread_mutex_unlock(&in_counter_mutex);
66  pthread_mutex_lock(&out_counter_mutex);
67  ++out_counter;
68  pthread_mutex_unlock(&out_counter_mutex);
69 }
70 
72 {
73  unsigned int l_in_counter, l_out_counter, l_drop_counter, l_qsize;
74 
75  pthread_mutex_lock(&in_counter_mutex);
76  l_in_counter = in_counter; in_counter = 0;
77  pthread_mutex_unlock(&in_counter_mutex);
78 
79  pthread_mutex_lock(&out_counter_mutex);
80  l_out_counter = out_counter; out_counter = 0;
81  pthread_mutex_unlock(&out_counter_mutex);
82 
83  pthread_mutex_lock(&drop_counter_mutex);
84  l_drop_counter = drop_counter; drop_counter = 0;
85  pthread_mutex_unlock(&drop_counter_mutex);
86 
87  pthread_mutex_lock(&qsize_mutex);
88  l_qsize = qsize; qsize = 0;
89  pthread_mutex_unlock(&qsize_mutex);
90 
91  printf("%u:%u:%u:%u\n", l_in_counter, l_out_counter, l_drop_counter, l_qsize);
92  fflush(stdout);
93 }
94 
95 
96 int
97 main(int argc, char **argv)
98 {
99  std::string topic = "", mongodb = "localhost", nodename = "";
100  collection = "";
101 
103 
104  int c;
105  while ((c = getopt(argc, argv, "t:m:n:c:")) != -1) {
106  if ((c == '?') || (c == ':')) {
107  printf("Usage: %s -t topic -m mongodb -n nodename -c collection\n", argv[0]);
108  exit(-1);
109  } else if (c == 't') {
110  topic = optarg;
111  } else if (c == 'm') {
112  mongodb = optarg;
113  } else if (c == 'n') {
114  nodename = optarg;
115  } else if (c == 'c') {
116  collection = optarg;
117  }
118  }
119 
120  if (topic == "") {
121  printf("No topic given.\n");
122  exit(-2);
123  } else if (nodename == "") {
124  printf("No node name given.\n");
125  exit(-2);
126  }
127 
128  ros::init(argc, argv, nodename);
129  ros::NodeHandle n;
130 
131  std::string errmsg;
132  mongodb_conn = new DBClientConnection(/* auto reconnect*/ true);
133  if (! mongodb_conn->connect(mongodb, errmsg)) {
134  ROS_ERROR("Failed to connect to MongoDB: %s", errmsg.c_str());
135  return -1;
136  }
137 
138  ros::Subscriber sub = n.subscribe<sensor_msgs::CompressedImage>(topic, 1000, msg_callback);
139  ros::Timer count_print_timer = n.createTimer(ros::Duration(5, 0), print_count);
140 
141  ros::spin();
142 
143  delete mongodb_conn;
144 
145  return 0;
146 }
static pthread_mutex_t drop_counter_mutex
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
static pthread_mutex_t qsize_mutex
unsigned int out_counter
void msg_callback(const sensor_msgs::CompressedImage::ConstPtr &msg)
unsigned int qsize
ROSCPP_DECL void spin(Spinner &spinner)
DBClientConnection * mongodb_conn
std::string topic
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
static pthread_mutex_t in_counter_mutex
void print_count(const ros::TimerEvent &te)
static pthread_mutex_t out_counter_mutex
unsigned int drop_counter
std::string collection
#define ROS_ERROR(...)
int main(int argc, char **argv)
unsigned int in_counter


mongodb_log
Author(s): Tim Niemueller
autogenerated on Sat Sep 7 2019 03:40:41