00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "rtabmap/core/DBDriver.h"
00029
00030 #include "rtabmap/core/Signature.h"
00031 #include "VisualWord.h"
00032 #include "rtabmap/utilite/UConversion.h"
00033 #include "rtabmap/utilite/UMath.h"
00034 #include "rtabmap/utilite/ULogger.h"
00035 #include "rtabmap/utilite/UTimer.h"
00036 #include "rtabmap/utilite/UStl.h"
00037
00038 namespace rtabmap {
00039
00040 DBDriver::DBDriver(const ParametersMap & parameters) :
00041 _emptyTrashesTime(0),
00042 _timestampUpdate(true)
00043 {
00044 this->parseParameters(parameters);
00045 }
00046
00047 DBDriver::~DBDriver()
00048 {
00049 join(true);
00050 this->emptyTrashes();
00051 }
00052
00053 void DBDriver::parseParameters(const ParametersMap & parameters)
00054 {
00055 }
00056
00057 void DBDriver::closeConnection()
00058 {
00059 UDEBUG("isRunning=%d", this->isRunning());
00060 this->join(true);
00061 UDEBUG("");
00062 this->emptyTrashes();
00063 _dbSafeAccessMutex.lock();
00064 this->disconnectDatabaseQuery();
00065 _dbSafeAccessMutex.unlock();
00066 UDEBUG("");
00067 }
00068
00069 bool DBDriver::openConnection(const std::string & url, bool overwritten)
00070 {
00071 UDEBUG("");
00072 _url = url;
00073 _dbSafeAccessMutex.lock();
00074 if(this->connectDatabaseQuery(url, overwritten))
00075 {
00076 _dbSafeAccessMutex.unlock();
00077 return true;
00078 }
00079 _dbSafeAccessMutex.unlock();
00080 return false;
00081 }
00082
00083 bool DBDriver::isConnected() const
00084 {
00085 bool r;
00086 _dbSafeAccessMutex.lock();
00087 r = isConnectedQuery();
00088 _dbSafeAccessMutex.unlock();
00089 return r;
00090 }
00091
00092
00093 long DBDriver::getMemoryUsed() const
00094 {
00095 long bytes;
00096 _dbSafeAccessMutex.lock();
00097 bytes = getMemoryUsedQuery();
00098 _dbSafeAccessMutex.unlock();
00099 return bytes;
00100 }
00101
00102 void DBDriver::mainLoop()
00103 {
00104 this->emptyTrashes();
00105 this->kill();
00106 }
00107
00108 void DBDriver::beginTransaction() const
00109 {
00110 _transactionMutex.lock();
00111 ULOGGER_DEBUG("");
00112 this->executeNoResultQuery("BEGIN TRANSACTION;");
00113 }
00114
00115 void DBDriver::commit() const
00116 {
00117 ULOGGER_DEBUG("");
00118 this->executeNoResultQuery("COMMIT;");
00119 _transactionMutex.unlock();
00120 }
00121
00122 void DBDriver::executeNoResult(const std::string & sql) const
00123 {
00124 _dbSafeAccessMutex.lock();
00125 this->executeNoResultQuery(sql);
00126 _dbSafeAccessMutex.unlock();
00127 }
00128
00129 void DBDriver::emptyTrashes(bool async)
00130 {
00131 if(async)
00132 {
00133 ULOGGER_DEBUG("Async emptying, start the trash thread");
00134 this->start();
00135 return;
00136 }
00137
00138 UTimer totalTime;
00139 totalTime.start();
00140
00141 std::map<int, Signature*> signatures;
00142 std::map<int, VisualWord*> visualWords;
00143 _trashesMutex.lock();
00144 {
00145 ULOGGER_DEBUG("signatures=%d, visualWords=%d", _trashSignatures.size(), _trashVisualWords.size());
00146 signatures = _trashSignatures;
00147 visualWords = _trashVisualWords;
00148 _trashSignatures.clear();
00149 _trashVisualWords.clear();
00150
00151 _dbSafeAccessMutex.lock();
00152 }
00153 _trashesMutex.unlock();
00154
00155 if(signatures.size() || visualWords.size())
00156 {
00157 this->beginTransaction();
00158 UTimer timer;
00159 timer.start();
00160 if(signatures.size())
00161 {
00162 if(this->isConnected())
00163 {
00164
00165 this->saveOrUpdate(uValues(signatures));
00166 }
00167
00168 for(std::map<int, Signature *>::iterator iter=signatures.begin(); iter!=signatures.end(); ++iter)
00169 {
00170 delete iter->second;
00171 }
00172 signatures.clear();
00173 ULOGGER_DEBUG("Time emptying memory signatures trash = %f...", timer.ticks());
00174 }
00175 if(visualWords.size())
00176 {
00177 if(this->isConnected())
00178 {
00179
00180 this->saveOrUpdate(uValues(visualWords));
00181 }
00182
00183 for(std::map<int, VisualWord *>::iterator iter=visualWords.begin(); iter!=visualWords.end(); ++iter)
00184 {
00185 delete (*iter).second;
00186 }
00187 visualWords.clear();
00188 ULOGGER_DEBUG("Time emptying memory visualWords trash = %f...", timer.ticks());
00189 }
00190
00191 this->commit();
00192 }
00193
00194 _emptyTrashesTime = totalTime.ticks();
00195 ULOGGER_DEBUG("Total time emptying trashes = %fs...", _emptyTrashesTime);
00196
00197 _dbSafeAccessMutex.unlock();
00198 }
00199
00200 void DBDriver::asyncSave(Signature * s)
00201 {
00202 if(s)
00203 {
00204 UDEBUG("s=%d", s->id());
00205 _trashesMutex.lock();
00206 {
00207 _trashSignatures.insert(std::pair<int, Signature*>(s->id(), s));
00208 }
00209 _trashesMutex.unlock();
00210 }
00211 }
00212
00213 void DBDriver::asyncSave(VisualWord * vw)
00214 {
00215 if(vw)
00216 {
00217 _trashesMutex.lock();
00218 {
00219 _trashVisualWords.insert(std::pair<int, VisualWord*>(vw->id(), vw));
00220 }
00221 _trashesMutex.unlock();
00222 }
00223 }
00224
00225 void DBDriver::saveOrUpdate(const std::vector<Signature *> & signatures) const
00226 {
00227 ULOGGER_DEBUG("");
00228 std::list<Signature *> toSave;
00229 std::list<Signature *> toUpdate;
00230 if(this->isConnected() && signatures.size())
00231 {
00232 for(std::vector<Signature *>::const_iterator i=signatures.begin(); i!=signatures.end();++i)
00233 {
00234 if((*i)->isSaved())
00235 {
00236 toUpdate.push_back(*i);
00237 }
00238 else
00239 {
00240 toSave.push_back(*i);
00241 }
00242 }
00243
00244 if(toUpdate.size())
00245 {
00246 this->updateQuery(toUpdate, _timestampUpdate);
00247 }
00248 if(toSave.size())
00249 {
00250 this->saveQuery(toSave);
00251 }
00252 }
00253 }
00254
00255 void DBDriver::saveOrUpdate(const std::vector<VisualWord *> & words) const
00256 {
00257 ULOGGER_DEBUG("");
00258 std::list<VisualWord *> toSave;
00259 std::list<VisualWord *> toUpdate;
00260 if(this->isConnected() && words.size())
00261 {
00262 for(std::vector<VisualWord *>::const_iterator i=words.begin(); i!=words.end();++i)
00263 {
00264 if((*i)->isSaved())
00265 {
00266 toUpdate.push_back(*i);
00267 }
00268 else
00269 {
00270 toSave.push_back(*i);
00271 }
00272 }
00273
00274 if(toUpdate.size())
00275 {
00276 this->updateQuery(toUpdate, _timestampUpdate);
00277 }
00278 if(toSave.size())
00279 {
00280 this->saveQuery(toSave);
00281 }
00282 }
00283 }
00284
00285 void DBDriver::load(VWDictionary * dictionary) const
00286 {
00287 _dbSafeAccessMutex.lock();
00288 this->loadQuery(dictionary);
00289 _dbSafeAccessMutex.unlock();
00290 }
00291
00292 void DBDriver::loadLastNodes(std::list<Signature *> & signatures) const
00293 {
00294 _dbSafeAccessMutex.lock();
00295 this->loadLastNodesQuery(signatures);
00296 _dbSafeAccessMutex.unlock();
00297 }
00298
00299 void DBDriver::loadSignatures(const std::list<int> & signIds,
00300 std::list<Signature *> & signatures,
00301 std::set<int> * loadedFromTrash)
00302 {
00303 UDEBUG("");
00304
00305 std::list<int> ids = signIds;
00306 std::list<Signature*>::iterator sIter;
00307 bool valueFound = false;
00308 _trashesMutex.lock();
00309 {
00310 for(std::list<int>::iterator iter = ids.begin(); iter != ids.end();)
00311 {
00312 valueFound = false;
00313 for(std::map<int, Signature*>::iterator sIter = _trashSignatures.begin(); sIter!=_trashSignatures.end();)
00314 {
00315 if(sIter->first == *iter)
00316 {
00317 signatures.push_back(sIter->second);
00318 _trashSignatures.erase(sIter++);
00319
00320 valueFound = true;
00321 break;
00322 }
00323 else
00324 {
00325 ++sIter;
00326 }
00327 }
00328 if(valueFound)
00329 {
00330 if(loadedFromTrash)
00331 {
00332 loadedFromTrash->insert(*iter);
00333 }
00334 iter = ids.erase(iter);
00335 }
00336 else
00337 {
00338 ++iter;
00339 }
00340 }
00341 }
00342 _trashesMutex.unlock();
00343 UDEBUG("");
00344 if(ids.size())
00345 {
00346 _dbSafeAccessMutex.lock();
00347 this->loadSignaturesQuery(ids, signatures);
00348 _dbSafeAccessMutex.unlock();
00349 }
00350 }
00351
00352 void DBDriver::loadWords(const std::set<int> & wordIds, std::list<VisualWord *> & vws)
00353 {
00354
00355 std::set<int> ids = wordIds;
00356 std::map<int, VisualWord*>::iterator wIter;
00357 std::list<VisualWord *> puttedBack;
00358 _trashesMutex.lock();
00359 {
00360 if(_trashVisualWords.size())
00361 {
00362 for(std::set<int>::iterator iter = ids.begin(); iter != ids.end();)
00363 {
00364 wIter = _trashVisualWords.find(*iter);
00365 if(wIter != _trashVisualWords.end())
00366 {
00367 UDEBUG("put back word %d from trash", *iter);
00368 puttedBack.push_back(wIter->second);
00369 _trashVisualWords.erase(wIter);
00370 ids.erase(iter++);
00371 }
00372 else
00373 {
00374 ++iter;
00375 }
00376 }
00377 }
00378 }
00379 _trashesMutex.unlock();
00380 if(ids.size())
00381 {
00382 _dbSafeAccessMutex.lock();
00383 this->loadWordsQuery(ids, vws);
00384 _dbSafeAccessMutex.unlock();
00385 uAppend(vws, puttedBack);
00386 }
00387 else if(puttedBack.size())
00388 {
00389 uAppend(vws, puttedBack);
00390 }
00391 }
00392
00393 void DBDriver::loadNodeData(std::list<Signature *> & signatures, bool loadMetricData) const
00394 {
00395
00396
00397 _trashesMutex.lock();
00398 if(_trashSignatures.size())
00399 {
00400 for(std::list<Signature *>::iterator iter=signatures.begin(); iter!=signatures.end(); ++iter)
00401 {
00402 UASSERT(*iter != 0);
00403 UASSERT_MSG(!uContains(_trashSignatures, (*iter)->id()), uFormat("Signature %d should not be used when transferred to trash!!!!", (*iter)->id()).c_str());
00404 }
00405 }
00406 _trashesMutex.unlock();
00407
00408 _dbSafeAccessMutex.lock();
00409 this->loadNodeDataQuery(signatures, loadMetricData);
00410 _dbSafeAccessMutex.unlock();
00411 }
00412
00413 void DBDriver::getNodeData(
00414 int signatureId,
00415 cv::Mat & imageCompressed,
00416 cv::Mat & depthCompressed,
00417 cv::Mat & laserScanCompressed,
00418 float & fx,
00419 float & fy,
00420 float & cx,
00421 float & cy,
00422 Transform & localTransform,
00423 int & laserScanMaxPts) const
00424 {
00425 bool found = false;
00426
00427 _trashesMutex.lock();
00428 if(uContains(_trashSignatures, signatureId))
00429 {
00430 const Signature * s = _trashSignatures.at(signatureId);
00431 if(!s->getImageCompressed().empty() || !s->isSaved())
00432 {
00433 imageCompressed = s->getImageCompressed();
00434 depthCompressed = s->getDepthCompressed();
00435 laserScanCompressed = s->getLaserScanCompressed();
00436 fx = s->getFx();
00437 fy = s->getFy();
00438 cx = s->getCx();
00439 cy = s->getCy();
00440 localTransform = s->getLocalTransform();
00441 laserScanMaxPts = s->getLaserScanMaxPts();
00442 found = true;
00443 }
00444 }
00445 _trashesMutex.unlock();
00446
00447 if(!found)
00448 {
00449 _dbSafeAccessMutex.lock();
00450 this->getNodeDataQuery(signatureId, imageCompressed, depthCompressed, laserScanCompressed, fx, fy, cx, cy, localTransform, laserScanMaxPts);
00451 _dbSafeAccessMutex.unlock();
00452 }
00453 }
00454
00455 void DBDriver::getNodeData(int signatureId, cv::Mat & imageCompressed) const
00456 {
00457 bool found = false;
00458
00459 _trashesMutex.lock();
00460 if(uContains(_trashSignatures, signatureId))
00461 {
00462 const Signature * s = _trashSignatures.at(signatureId);
00463 if(!s->getImageCompressed().empty() || !s->isSaved())
00464 {
00465 imageCompressed = s->getImageCompressed();
00466 found = true;
00467 }
00468 }
00469 _trashesMutex.unlock();
00470
00471 if(!found)
00472 {
00473 _dbSafeAccessMutex.lock();
00474 this->getNodeDataQuery(signatureId, imageCompressed);
00475 _dbSafeAccessMutex.unlock();
00476 }
00477 }
00478
00479 bool DBDriver::getNodeInfo(int signatureId,
00480 Transform & pose,
00481 int & mapId,
00482 int & weight,
00483 std::string & label,
00484 double & stamp,
00485 std::vector<unsigned char> & userData) const
00486 {
00487 bool found = false;
00488
00489 _trashesMutex.lock();
00490 if(uContains(_trashSignatures, signatureId))
00491 {
00492 pose = _trashSignatures.at(signatureId)->getPose();
00493 mapId = _trashSignatures.at(signatureId)->mapId();
00494 weight = _trashSignatures.at(signatureId)->getWeight();
00495 label = _trashSignatures.at(signatureId)->getLabel();
00496 stamp = _trashSignatures.at(signatureId)->getStamp();
00497 userData = _trashSignatures.at(signatureId)->getUserData();
00498 found = true;
00499 }
00500 _trashesMutex.unlock();
00501
00502 if(!found)
00503 {
00504 _dbSafeAccessMutex.lock();
00505 found = this->getNodeInfoQuery(signatureId, pose, mapId, weight, label, stamp, userData);
00506 _dbSafeAccessMutex.unlock();
00507 }
00508 return found;
00509 }
00510
00511 void DBDriver::loadLinks(int signatureId, std::map<int, Link> & links, Link::Type type) const
00512 {
00513 bool found = false;
00514
00515 _trashesMutex.lock();
00516 if(uContains(_trashSignatures, signatureId))
00517 {
00518 const Signature * s = _trashSignatures.at(signatureId);
00519 UASSERT(s != 0);
00520 for(std::map<int, Link>::const_iterator nIter = s->getLinks().begin();
00521 nIter!=s->getLinks().end();
00522 ++nIter)
00523 {
00524 if(type == Link::kUndef || nIter->second.type() == type)
00525 {
00526 links.insert(*nIter);
00527 }
00528 }
00529 found = true;
00530 }
00531 _trashesMutex.unlock();
00532
00533 if(!found)
00534 {
00535 _dbSafeAccessMutex.lock();
00536 this->loadLinksQuery(signatureId, links, type);
00537 _dbSafeAccessMutex.unlock();
00538 }
00539 }
00540
00541 void DBDriver::getWeight(int signatureId, int & weight) const
00542 {
00543 bool found = false;
00544
00545 _trashesMutex.lock();
00546 if(uContains(_trashSignatures, signatureId))
00547 {
00548 weight = _trashSignatures.at(signatureId)->getWeight();
00549 found = true;
00550 }
00551 _trashesMutex.unlock();
00552
00553 if(!found)
00554 {
00555 _dbSafeAccessMutex.lock();
00556 this->getWeightQuery(signatureId, weight);
00557 _dbSafeAccessMutex.unlock();
00558 }
00559 }
00560
00561 void DBDriver::getAllNodeIds(std::set<int> & ids, bool ignoreChildren) const
00562 {
00563
00564 _trashesMutex.lock();
00565 if(_trashSignatures.size())
00566 {
00567 for(std::map<int, Signature*>::const_iterator sIter = _trashSignatures.begin(); sIter!=_trashSignatures.end(); ++sIter)
00568 {
00569 bool hasNeighbors = !ignoreChildren;
00570 if(ignoreChildren)
00571 {
00572 for(std::map<int, Link>::const_iterator nIter = sIter->second->getLinks().begin();
00573 nIter!=sIter->second->getLinks().end();
00574 ++nIter)
00575 {
00576 if(nIter->second.type() == Link::kNeighbor)
00577 {
00578 hasNeighbors = true;
00579 break;
00580 }
00581 }
00582 }
00583 if(hasNeighbors)
00584 {
00585 ids.insert(sIter->first);
00586 }
00587 }
00588
00589 std::vector<int> keys = uKeys(_trashSignatures);
00590
00591 }
00592 _trashesMutex.unlock();
00593
00594 _dbSafeAccessMutex.lock();
00595 this->getAllNodeIdsQuery(ids, ignoreChildren);
00596 _dbSafeAccessMutex.unlock();
00597 }
00598
00599 void DBDriver::getLastNodeId(int & id) const
00600 {
00601
00602 _trashesMutex.lock();
00603 if(_trashSignatures.size())
00604 {
00605 id = _trashSignatures.rbegin()->first;
00606 }
00607 _trashesMutex.unlock();
00608
00609 _dbSafeAccessMutex.lock();
00610 this->getLastIdQuery("Node", id);
00611 _dbSafeAccessMutex.unlock();
00612 }
00613
00614 void DBDriver::getLastWordId(int & id) const
00615 {
00616
00617 _trashesMutex.lock();
00618 if(_trashVisualWords.size())
00619 {
00620 id = _trashVisualWords.rbegin()->first;
00621 }
00622 _trashesMutex.unlock();
00623
00624 _dbSafeAccessMutex.lock();
00625 this->getLastIdQuery("Word", id);
00626 _dbSafeAccessMutex.unlock();
00627 }
00628
00629 void DBDriver::getInvertedIndexNi(int signatureId, int & ni) const
00630 {
00631 bool found = false;
00632
00633 _trashesMutex.lock();
00634 if(uContains(_trashSignatures, signatureId))
00635 {
00636 ni = _trashSignatures.at(signatureId)->getWords().size();
00637 found = true;
00638 }
00639 _trashesMutex.unlock();
00640
00641 if(!found)
00642 {
00643 _dbSafeAccessMutex.lock();
00644 this->getInvertedIndexNiQuery(signatureId, ni);
00645 _dbSafeAccessMutex.unlock();
00646 }
00647 }
00648
00649 void DBDriver::getNodeIdByLabel(const std::string & label, int & id) const
00650 {
00651 if(!label.empty())
00652 {
00653 int idFound = 0;
00654
00655 _trashesMutex.lock();
00656 for(std::map<int, Signature*>::const_iterator sIter = _trashSignatures.begin(); sIter!=_trashSignatures.end(); ++sIter)
00657 {
00658 if(sIter->second->getLabel().compare(label) == 0)
00659 {
00660 idFound = sIter->first;
00661 break;
00662 }
00663 }
00664 _trashesMutex.unlock();
00665
00666
00667 if(idFound == 0)
00668 {
00669 _dbSafeAccessMutex.lock();
00670 this->getNodeIdByLabelQuery(label, id);
00671 _dbSafeAccessMutex.unlock();
00672 }
00673 else
00674 {
00675 id = idFound;
00676 }
00677 }
00678 else
00679 {
00680 UWARN("Can't search with an empty label!");
00681 }
00682 }
00683
00684 void DBDriver::getAllLabels(std::map<int, std::string> & labels) const
00685 {
00686
00687 _trashesMutex.lock();
00688 for(std::map<int, Signature*>::const_iterator sIter = _trashSignatures.begin(); sIter!=_trashSignatures.end(); ++sIter)
00689 {
00690 if(!sIter->second->getLabel().empty())
00691 {
00692 labels.insert(std::make_pair(sIter->first, sIter->second->getLabel()));
00693 }
00694 }
00695 _trashesMutex.unlock();
00696
00697
00698 _dbSafeAccessMutex.lock();
00699 this->getAllLabelsQuery(labels);
00700 _dbSafeAccessMutex.unlock();
00701 }
00702
00703 void DBDriver::addStatisticsAfterRun(int stMemSize, int lastSignAdded, int processMemUsed, int databaseMemUsed, int dictionarySize) const
00704 {
00705 ULOGGER_DEBUG("");
00706 if(this->isConnected())
00707 {
00708 std::stringstream query;
00709 query << "INSERT INTO Statistics(STM_size,last_sign_added,process_mem_used,database_mem_used,dictionary_size) values("
00710 << stMemSize << ","
00711 << lastSignAdded << ","
00712 << processMemUsed << ","
00713 << databaseMemUsed << ","
00714 << dictionarySize << ");";
00715
00716 this->executeNoResultQuery(query.str());
00717 }
00718 }
00719
00720 }