00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #include "database_interface/postgresql_database.h"
00038
00039
00040 #include <libpq-fe.h>
00041
00042 #include <sstream>
00043 #include <iostream>
00044
00045 namespace database_interface {
00046
00047 void operator>>(const YAML::Node& node, PostgresqlDatabaseConfig &options)
00048 {
00049 #ifdef HAVE_NEW_YAMLCPP
00050 options.password_ = node["password"].as<std::string>();
00051 options.user_ = node["user"].as<std::string>();
00052 options.host_ = node["host"].as<std::string>();
00053 options.port_ = node["port"].as<std::string>();
00054 options.dbname_ = node["dbname"].as<std::string>();
00055 #else
00056 node["password"] >> options.password_;
00057 node["user"] >> options.user_;
00058 node["host"] >> options.host_;
00059 node["port"] >> options.port_;
00060 node["dbname"] >> options.dbname_;
00061 #endif
00062 }
00063
00068 class PostgresqlDatabase::PGresultAutoPtr
00069 {
00070 private:
00071 PGresult* result_;
00072 public:
00073 PGresultAutoPtr(PGresult *ptr) : result_(ptr){}
00074 ~PGresultAutoPtr(){PQclear(result_);}
00075 void reset(PGresult *ptr){PQclear(result_); result_=ptr;}
00076 PGresult* operator * (){return result_;}
00077 };
00078
00079
00080 void PostgresqlDatabase::pgMDBconstruct(std::string host, std::string port, std::string user,
00081 std::string password, std::string dbname )
00082 {
00083 std::string conn_info;
00084
00085 if (!host.empty()) conn_info += "host=" + host;
00086 if (!port.empty()) conn_info += " port=" + port;
00087 if (!user.empty()) conn_info += " user=" + user;
00088 if (!password.empty()) conn_info += " password=" + password;
00089 if (!dbname.empty()) conn_info += " dbname=" + dbname;
00090 connection_= PQconnectdb(conn_info.c_str());
00091 if (PQstatus(connection_)!=CONNECTION_OK)
00092 {
00093 ROS_ERROR("Database connection failed with error message: %s", PQerrorMessage(connection_));
00094 }
00095 }
00096
00097 PostgresqlDatabase::PostgresqlDatabase(const PostgresqlDatabaseConfig &config)
00098 : in_transaction_(false)
00099 {
00100 pgMDBconstruct(config.getHost(), config.getPort(), config.getUser(),
00101 config.getPassword(), config.getDBname());
00102 }
00103
00104 PostgresqlDatabase::PostgresqlDatabase(std::string host, std::string port, std::string user,
00105 std::string password, std::string dbname )
00106 : in_transaction_(false)
00107 {
00108 pgMDBconstruct(host, port, user, password, dbname);
00109 }
00110
00111 PostgresqlDatabase::~PostgresqlDatabase()
00112 {
00113 PQfinish(connection_);
00114 }
00115
00116 bool PostgresqlDatabase::isConnected() const
00117 {
00118 if (PQstatus(connection_)==CONNECTION_OK) return true;
00119 else return false;
00120 }
00121
00123 bool PostgresqlDatabase::rollback()
00124 {
00125 PGresultAutoPtr result((PQexec(connection_,"ROLLBACK;")));
00126 if (PQresultStatus(*result) != PGRES_COMMAND_OK)
00127 {
00128 ROS_ERROR("Rollback failed");
00129 return false;
00130 }
00131 in_transaction_ = false;
00132 return true;
00133 }
00134
00136 bool PostgresqlDatabase::begin()
00137 {
00138 if( in_transaction_ ) return true;
00139
00140 PGresultAutoPtr result(PQexec(connection_, "BEGIN;"));
00141 if (PQresultStatus(*result) != PGRES_COMMAND_OK)
00142 {
00143 ROS_ERROR("Database begin query failed. Error: %s", PQresultErrorMessage(*result));
00144 return false;
00145 }
00146 in_transaction_ = true;
00147 return true;
00148 }
00149
00151 bool PostgresqlDatabase::commit()
00152 {
00153 PGresultAutoPtr result(PQexec(connection_, "COMMIT;"));
00154 if (PQresultStatus(*result) != PGRES_COMMAND_OK)
00155 {
00156 ROS_ERROR("Database commit query failed. Error: %s", PQresultErrorMessage(*result));
00157 return false;
00158 }
00159 in_transaction_ = false;
00160 return true;
00161 }
00162
00163 bool PostgresqlDatabase::getVariable(std::string name, std::string &value) const
00164 {
00165 std::string query("SELECT variable_value FROM variable WHERE variable_name=" + name);
00166 PGresultAutoPtr result(PQexec(connection_, query.c_str()));
00167 if (PQresultStatus(*result) != PGRES_TUPLES_OK)
00168 {
00169 ROS_ERROR("Database get variable query failed. Error: %s", PQresultErrorMessage(*result));
00170 return false;
00171 }
00172 if (PQntuples(*result)==0)
00173 {
00174 ROS_ERROR("Database get variable query failed. Variable %s not in database", name.c_str());
00175 return false;
00176 }
00177 value = PQgetvalue(*result, 0, 0);
00178 return true;
00179 }
00180
00184 bool PostgresqlDatabase::getSequence(std::string name, std::string &value)
00185 {
00186 std::string query("SELECT * FROM currval('" + name + "');");
00187 PGresultAutoPtr result( PQexec(connection_, query.c_str()) );
00188 if (PQresultStatus(*result) != PGRES_TUPLES_OK)
00189 {
00190 ROS_ERROR("Get sequence: query failed. Error: %s", PQresultErrorMessage(*result));
00191 return false;
00192 }
00193 if (!PQntuples(*result))
00194 {
00195 ROS_ERROR("Get sequence: sequence %s not found", name.c_str());
00196 return false;
00197 }
00198 const char *id_char = PQgetvalue(*result, 0, 0);
00199 value.assign(id_char);
00200 return true;
00201 }
00202
00212 bool PostgresqlDatabase::getListRawResult(const DBClass *example,
00213 std::vector<const DBFieldBase*> &fields,
00214 std::vector<int> &column_ids,
00215 std::string where_clause,
00216 boost::shared_ptr<PGresultAutoPtr> &result, int &num_tuples) const
00217 {
00218
00219
00220 if(example->getPrimaryKeyField()->getType() == DBFieldBase::BINARY)
00221 {
00222 ROS_ERROR("Database get list: can not use binary primary key (%s)",
00223 example->getPrimaryKeyField()->getName().c_str());
00224 return false;
00225 }
00226
00227 std::string select_query;
00228 select_query += "SELECT " + example->getPrimaryKeyField()->getName() + " ";
00229 fields.push_back(example->getPrimaryKeyField());
00230
00231
00232 std::vector<std::string> join_tables;
00233 std::string join_clauses;
00234 for (size_t i=0; i<example->getNumFields(); i++)
00235 {
00236 if (!example->getField(i)->getReadFromDatabase()) continue;
00237
00238 if (example->getField(i)->getType()==DBFieldBase::BINARY)
00239 {
00240 ROS_WARN("Database get list: binary field (%s) can not be loaded by default",
00241 example->getField(i)->getName().c_str());
00242 continue;
00243 }
00244
00245 select_query += ", " + example->getField(i)->getName();
00246 fields.push_back(example->getField(i));
00247 if ( example->getField(i)->getTableName() != example->getPrimaryKeyField()->getTableName() )
00248 {
00249
00250 bool already_join = false;
00251 for (size_t j=0; j<join_tables.size() && !already_join; j++)
00252 {
00253 if (join_tables[j] == example->getField(i)->getTableName()) already_join = true;
00254 }
00255
00256 if (!already_join)
00257 {
00258 const DBFieldBase *foreign_key = NULL;
00259 if (!example->getForeignKey(example->getField(i)->getTableName(), foreign_key))
00260 {
00261 ROS_ERROR("Database get list: could not find foreign key for table %s",
00262 example->getField(i)->getTableName().c_str());
00263 return false;
00264 }
00265 join_clauses += " JOIN " + example->getField(i)->getTableName() + " USING ("
00266 + foreign_key->getName() + ") ";
00267 join_tables.push_back( example->getField(i)->getTableName() );
00268 }
00269 }
00270 }
00271
00272 select_query += " FROM " + example->getPrimaryKeyField()->getTableName() + " ";
00273
00274 if (!join_clauses.empty())
00275 {
00276 select_query += join_clauses;
00277 }
00278
00279 if (!where_clause.empty())
00280 {
00281 select_query += " WHERE " + where_clause;
00282 }
00283
00284 select_query += ";";
00285
00286
00287
00288 PGresult* raw_result = PQexec(connection_, select_query.c_str());
00289 result.reset( new PGresultAutoPtr(raw_result) );
00290 if (PQresultStatus(raw_result) != PGRES_TUPLES_OK)
00291 {
00292 ROS_ERROR("Database get list: query failed. Error: %s", PQresultErrorMessage(raw_result));
00293 return false;
00294 }
00295
00296 num_tuples = PQntuples(raw_result);
00297 if (!num_tuples)
00298 {
00299 return true;
00300 }
00301
00302
00303 for (size_t t=0; t<fields.size(); t++)
00304 {
00305 int id = PQfnumber(raw_result, fields[t]->getName().c_str());
00306 if (id < 0)
00307 {
00308 ROS_ERROR("Database get list: column %s missing in result", fields[t]->getName().c_str());
00309 return false;
00310 }
00311 column_ids.push_back(id);
00312 }
00313 return true;
00314 }
00315
00320 bool PostgresqlDatabase::populateListEntry(DBClass *entry, boost::shared_ptr<PGresultAutoPtr> result,
00321 int row_num,
00322 const std::vector<const DBFieldBase*> &fields,
00323 const std::vector<int> &column_ids) const
00324 {
00325 for (size_t t=0; t<fields.size(); t++)
00326 {
00327 const char* char_value = PQgetvalue(**result, row_num, column_ids[t]);
00328 DBFieldBase *entry_field = entry->getField(fields[t]->getName());
00329 if (!entry_field)
00330 {
00331 ROS_ERROR("Database get list: new entry missing field %s", fields[t]->getName().c_str());
00332 return false;
00333 }
00334 if ( !entry_field->fromString(char_value) )
00335 {
00336 ROS_ERROR("Database get list: failed to parse response \"%s\" for field \"%s\"",
00337 char_value, fields[t]->getName().c_str());
00338 return false;
00339 }
00340 }
00341 return true;
00342 }
00343
00350 bool PostgresqlDatabase::countList(const DBClass *example, int &count, std::string where_clause) const
00351 {
00352 const DBFieldBase* pk_field = example->getPrimaryKeyField();
00353
00354 std::string query( "SELECT COUNT(" + pk_field->getName() + ") FROM " + pk_field->getTableName() );
00355 if (!where_clause.empty())
00356 {
00357 query += " WHERE " + where_clause;
00358 }
00359 query += ";";
00360
00361 ROS_INFO("Query (count): %s", query.c_str());
00362 PGresultAutoPtr result( PQexec(connection_, query.c_str()) );
00363
00364 if (PQresultStatus(*result) != PGRES_TUPLES_OK)
00365 {
00366 ROS_ERROR("Database count list query failed. Error: %s", PQresultErrorMessage(*result));
00367 return false;
00368 }
00369 const char *reply = PQgetvalue(*result, 0, 0);
00370 if (!DBStreamable<int>::streamableFromString(count, reply))
00371 {
00372 ROS_ERROR("Database count list failed. Could not understand reply: %s", reply);
00373 return false;
00374 }
00375 return true;
00376 }
00377
00384 bool PostgresqlDatabase::saveToDatabase(const DBFieldBase* field)
00385 {
00386 if (!field->getWritePermission())
00387 {
00388 ROS_ERROR("Database save field: field %s does not have write permission", field->getName().c_str());
00389 return false;
00390 }
00391
00392 const DBFieldBase* key_field;
00393 if (field->getTableName() == field->getOwner()->getPrimaryKeyField()->getTableName())
00394 {
00395 key_field = field->getOwner()->getPrimaryKeyField();
00396 }
00397 else
00398 {
00399 if (!field->getOwner()->getForeignKey(field->getTableName(), key_field))
00400 {
00401 ROS_ERROR("Database save field: could not find foreign key for table %s",
00402 field->getTableName().c_str());
00403 return false;
00404 }
00405
00406
00407 }
00408
00409
00410
00411 std::string query("UPDATE " + field->getTableName() +
00412 " SET " + field->getName() + "=$2"
00413 " WHERE " + key_field->getName() + "=$1;");
00414
00415 std::vector<const char*> param_values(2);
00416 std::vector<int> param_lengths(2);
00417 std::vector<int> param_formats(2);
00418
00419 std::string id_str;
00420 if (!key_field->toString(id_str))
00421 {
00422 ROS_ERROR("Database save field: failed to convert key id value to string");
00423 return false;
00424 }
00425 param_formats[0] = 0;
00426 param_values[0] = id_str.c_str();
00427
00428
00429 std::string value_str;
00430 if (field->getType() == DBFieldBase::TEXT)
00431 {
00432 if (!field->toString(value_str))
00433 {
00434 ROS_ERROR("Database save field: failed to convert field value to string");
00435 return false;
00436 }
00437 param_formats[1] = 0;
00438 param_values[1] = value_str.c_str();
00439 }
00440 else if (field->getType() == DBFieldBase::BINARY)
00441 {
00442 size_t length;
00443 if (!field->toBinary(param_values[1], length))
00444 {
00445 ROS_ERROR("Database save field: failed to convert field value to binary");
00446 return false;
00447 }
00448 param_lengths[1] = length;
00449 param_formats[1] = 1;
00450 }
00451 else
00452 {
00453 ROS_ERROR("Database save field: unkown field type");
00454 return false;
00455 }
00456
00457
00458
00459 PGresultAutoPtr result( PQexecParams(connection_, query.c_str(), 2,
00460 NULL, ¶m_values[0], ¶m_lengths[0], ¶m_formats[0], 0) );
00461 if (PQresultStatus(*result) != PGRES_COMMAND_OK)
00462 {
00463 ROS_ERROR("Database save field: query failed. Error: %s", PQresultErrorMessage(*result));
00464 return false;
00465 }
00466 return true;
00467 }
00468
00474 bool PostgresqlDatabase::loadFromDatabase(DBFieldBase* field) const
00475 {
00476 const DBFieldBase* key_field = NULL;
00477 if (field->getTableName() == field->getOwner()->getPrimaryKeyField()->getTableName())
00478 {
00479 key_field = field->getOwner()->getPrimaryKeyField();
00480 }
00481 else
00482 {
00483 if (!field->getOwner()->getForeignKey(field->getTableName(), key_field))
00484 {
00485 ROS_ERROR("Database load field: could not find foreign key for table %s",
00486 field->getTableName().c_str());
00487 return false;
00488 }
00489 }
00490
00491 std::string id_str;
00492 if (!key_field->toString(id_str))
00493 {
00494 ROS_ERROR("Database load field: failed to convert key id value to string");
00495 return false;
00496 }
00497
00498 std::string query("SELECT " + field->getName() + " FROM " + field->getTableName() +
00499 " WHERE " + key_field->getName() + " ='" + id_str + "';");
00500
00501
00502
00503 int data_type;
00504 if (field->getType() == DBFieldBase::TEXT) data_type = 0;
00505 else if (field->getType() == DBFieldBase::BINARY) data_type = 1;
00506 else
00507 {
00508 ROS_ERROR("Database load field: unkown field type");
00509 return false;
00510 }
00511
00512 PGresultAutoPtr result( PQexecParams(connection_, query.c_str(), 0, NULL, NULL, NULL, NULL, data_type) );
00513 if (PQresultStatus(*result) != PGRES_TUPLES_OK)
00514 {
00515 ROS_ERROR("Database load field: query failed. Error: %s", PQresultErrorMessage(*result));
00516 return false;
00517 }
00518
00519 if (PQntuples(*result)==0)
00520 {
00521 ROS_ERROR("Database load field: no entry found for key value %s on column %s",
00522 id_str.c_str(), key_field->getName().c_str());
00523 return false;
00524 }
00525
00526 const char* result_char = PQgetvalue(*result, 0, 0);
00527 if (field->getType() == DBFieldBase::TEXT)
00528 {
00529 if ( !field->fromString(result_char) )
00530 {
00531 ROS_ERROR("Database load field: failed to parse text result \"%s\" for field \"%s\"",
00532 result_char, field->getName().c_str());
00533 return false;
00534 }
00535 }
00536 else if (field->getType() == DBFieldBase::BINARY)
00537 {
00538 size_t length = PQgetlength(*result, 0, 0);
00539 if (!field->fromBinary(result_char, length))
00540 {
00541 ROS_ERROR("Database load field: failed to parse binary result length %d for field \"%s\"",
00542 (int) length, field->getName().c_str());
00543 return false;
00544 }
00545 }
00546 else
00547 {
00548 ROS_ERROR("Database load field: failed to parse unkown field type");
00549 return false;
00550 }
00551
00552 return true;
00553 }
00554
00563 bool PostgresqlDatabase::insertIntoTable(std::string table_name,
00564 const std::vector<const DBFieldBase*> &fields)
00565 {
00566 if (fields.empty())
00567 {
00568 ROS_ERROR("Insert into table: no columns to insert");
00569 return false;
00570 }
00571
00572 std::string query("INSERT INTO " + table_name + "(");
00573
00574
00575 if (table_name == fields[0]->getTableName())
00576 {
00577
00578 query += fields[0]->getName();
00579 }
00580 else
00581 {
00582
00583 const DBFieldBase *foreign_key = NULL;
00584 if (!fields[0]->getOwner()->getForeignKey(table_name, foreign_key))
00585 {
00586 ROS_ERROR("Database insert into table: could not find foreign key for table %s", table_name.c_str());
00587 return false;
00588 }
00589 query += foreign_key->getName();
00590 }
00591
00592 for(size_t i=1; i<fields.size(); i++)
00593 {
00594 query += "," + fields[i]->getName();
00595 }
00596 query += ")";
00597
00598 query += "VALUES(";
00599 for (size_t i=0; i<fields.size(); i++)
00600 {
00601 if ( i!=0 && table_name!=fields[i]->getTableName())
00602 {
00603 ROS_ERROR("Database insert into table: field table does not match table name");
00604 return false;
00605 }
00606 if (i!=0) query += ",";
00607 std::ostringstream ss;
00608 ss << i+1;
00609 query += "$" + ss.str();
00610 }
00611 query += ");";
00612
00613
00614
00615
00616 std::vector<std::string> param_strings(fields.size());
00617 std::vector<const char*> param_values(fields.size());
00618 std::vector<int> param_lengths(fields.size());
00619 std::vector<int> param_formats(fields.size());
00620 for (size_t i=0; i<fields.size(); i++)
00621 {
00622 if (fields[i]->getType() == DBFieldBase::TEXT)
00623 {
00624 if (!fields[i]->toString(param_strings[i]))
00625 {
00626 ROS_ERROR("Database insert into table: could not parse field %s", fields[i]->getName().c_str());
00627 return false;
00628 }
00629 param_values[i] = param_strings[i].c_str();
00630 param_formats[i] = 0;
00631 }
00632 else if (fields[i]->getType() == DBFieldBase::BINARY)
00633 {
00634 size_t length;
00635 if (!fields[i]->toBinary(param_values[i], length))
00636 {
00637 ROS_ERROR("Database insert into table: could not binarize field %s", fields[i]->getName().c_str());
00638 return false;
00639 }
00640 param_lengths[i] = length;
00641 param_formats[i] = 1;
00642 }
00643 else
00644 {
00645 ROS_ERROR("Database insert into table: unknown field type");
00646 return false;
00647 }
00648 }
00649
00650
00651 PGresultAutoPtr result( PQexecParams(connection_, query.c_str(), fields.size(), NULL,
00652 &(param_values[0]), &(param_lengths[0]), &(param_formats[0]), 0) );
00653 if (PQresultStatus(*result) != PGRES_COMMAND_OK)
00654 {
00655 ROS_ERROR("Database insert into table: query failed.\nError: %s.\nQuery: %s",
00656 PQresultErrorMessage(*result), query.c_str());
00657 return false;
00658 }
00659
00660 return true;
00661 }
00662
00669 bool PostgresqlDatabase::insertIntoDatabase(DBClass* instance)
00670 {
00671
00672 DBFieldBase* pk_field = instance->getPrimaryKeyField();
00673 if (pk_field->getType() != DBFieldBase::TEXT)
00674 {
00675 ROS_ERROR("Database insert: cannot insert binary primary key %s", pk_field->getName().c_str());
00676 return false;
00677 }
00678
00679
00680 std::vector<std::string> table_names;
00681 std::vector< std::vector<const DBFieldBase*> > table_fields;
00682
00683
00684 table_names.push_back(pk_field->getTableName());
00685 table_fields.push_back( std::vector<const DBFieldBase*>() );
00686
00687 bool insert_pk;
00688 if (pk_field->getWriteToDatabase())
00689 {
00690
00691 table_fields.back().push_back(pk_field);
00692 insert_pk = true;
00693 }
00694 else
00695 {
00696
00697
00698 if (pk_field->getSequenceName().empty())
00699 {
00700 ROS_ERROR("Database insert: attempt to insert instance without primary key and no sequence for it");
00701 return false;
00702 }
00703 insert_pk = false;
00704 }
00705
00706
00707
00708
00709 for (size_t i=0; i<instance->getNumFields(); i++)
00710 {
00711
00712 bool found = false;
00713 size_t t;
00714 for (t=0; t<table_names.size(); t++)
00715 {
00716 if ( table_names[t] == instance->getField(i)->getTableName() )
00717 {
00718 found = true;
00719 break;
00720 }
00721 }
00722 if (!found)
00723 {
00724
00725 const DBFieldBase* foreign_key = NULL;
00726 if (!instance->getForeignKey(instance->getField(i)->getTableName(), foreign_key))
00727 {
00728 ROS_ERROR("Database insert into table: could not find foreign key for table %s",
00729 instance->getField(i)->getTableName().c_str());
00730 return false;
00731 }
00732 if (foreign_key != pk_field)
00733 {
00734 continue;
00735 }
00736
00737 table_names.push_back( instance->getField(i)->getTableName() );
00738 table_fields.push_back( std::vector<const DBFieldBase*>() );
00739
00740
00741 table_fields.back().push_back(pk_field);
00742 t = table_names.size() - 1;
00743 }
00744 if ( !instance->getField(i)->getWriteToDatabase() ) continue;
00745 if ( instance->getField(i)->getType() != DBFieldBase::TEXT )
00746 {
00747 ROS_WARN("Database insert: cannot insert binary field %s in database",
00748 instance->getField(i)->getName().c_str());
00749 continue;
00750 }
00751
00752 table_fields[t].push_back(instance->getField(i));
00753 }
00754
00755
00756 if (!begin()) return false;
00757
00758
00759 if (!insertIntoTable(table_names[0], table_fields[0]))
00760 {
00761 rollback();
00762 return false;
00763 }
00764
00765
00766 if (!insert_pk)
00767 {
00768 std::string sequence_value;
00769 if (!getSequence(pk_field->getSequenceName(), sequence_value) || !pk_field->fromString(sequence_value))
00770 {
00771 ROS_ERROR("Database insert: failed to retrieve primary key after insertion");
00772 rollback();
00773 return false;
00774 }
00775 }
00776
00777
00778 for (size_t i=1; i<table_names.size(); i++)
00779 {
00780 if (!insertIntoTable(table_names[i], table_fields[i]))
00781 {
00782 rollback();
00783 return false;
00784 }
00785 }
00786
00787
00788 if (!commit()) return false;
00789
00790 return true;
00791 }
00792
00794 bool PostgresqlDatabase::deleteFromTable(std::string table_name, const DBFieldBase *key_field)
00795 {
00796 std::string id_str;
00797 if (!key_field->toString(id_str))
00798 {
00799 ROS_ERROR("Database delete from table: failed to convert key id value to string");
00800 return false;
00801 }
00802
00803 std::string query("DELETE FROM " + table_name + " WHERE " + key_field->getName() + "=" + id_str);
00804 PGresultAutoPtr result( PQexec(connection_, query.c_str()) );
00805 if (PQresultStatus(*result) != PGRES_COMMAND_OK)
00806 {
00807 ROS_ERROR("Database delete from table: query failed. Error: %s", PQresultErrorMessage(*result));
00808 return false;
00809 }
00810 return true;
00811 }
00812
00816 bool PostgresqlDatabase::deleteFromDatabase(DBClass* instance)
00817 {
00818 std::vector<std::string> table_names;
00819 std::vector<const DBFieldBase*> table_fields;
00820 DBFieldBase* pk_field = instance->getPrimaryKeyField();
00821
00822 table_names.push_back( pk_field->getTableName() );
00823 table_fields.push_back( pk_field );
00824
00825 for (size_t i=0; i<instance->getNumFields(); i++)
00826 {
00827
00828 size_t t;
00829 for (t=0; t<table_names.size(); t++)
00830 {
00831 if ( table_names[t] == instance->getField(i)->getTableName() ) break;
00832 }
00833 if (t<table_names.size()) continue;
00834
00835
00836 const DBFieldBase* foreign_key = NULL;
00837 if (!instance->getForeignKey(instance->getField(i)->getTableName(), foreign_key))
00838 {
00839 ROS_ERROR("Database insert into table: could not find foreign key for table %s",
00840 instance->getField(i)->getTableName().c_str());
00841 return false;
00842 }
00843 if (foreign_key != pk_field) continue;
00844
00845
00846 table_names.push_back( instance->getField(i)->getTableName() );
00847 table_fields.push_back( pk_field );
00848 }
00849
00850
00851 if (!begin()) return false;
00852
00853
00854 for (int i=(int)table_names.size()-1; i>=0; i--)
00855 {
00856 if (!deleteFromTable(table_names[i], table_fields[i]))
00857 {
00858 rollback();
00859 return false;
00860 }
00861 }
00862
00863
00864 if (!commit()) return false;
00865
00866 return true;
00867
00868 }
00869
00871 bool PostgresqlDatabase::listenToChannel(std::string channel) {
00872 std::string query = "LISTEN " + channel;
00873 PGresultAutoPtr result = PQexec(connection_,query.c_str());
00874 if (PQresultStatus(*result) != PGRES_COMMAND_OK)
00875 {
00876 ROS_WARN("LISTEN command failed: %s", PQerrorMessage(connection_));
00877 return false;
00878 }
00879 ROS_INFO("Now listening to channel \"%s\"",channel.c_str());
00880 return true;
00881 }
00882
00884 bool PostgresqlDatabase::unlistenToChannel(std::string channel)
00885 {
00886 std::string query = "UNLISTEN " + channel + " ;";
00887 PGresultAutoPtr result = PQexec(connection_,query.c_str());
00888 if (PQresultStatus(*result) != PGRES_COMMAND_OK)
00889 {
00890 ROS_WARN("UNLISTEN command failed: %s", PQerrorMessage(connection_));
00891 return false;
00892 }
00893 ROS_INFO("Not listening to channel \"%s\" anymore.",channel.c_str());
00894 return true;
00895 }
00896
00901 bool PostgresqlDatabase::checkNotify(Notification &no)
00902 {
00903 PGnotify *notify;
00904
00905 if (!PQconsumeInput(connection_))
00906 {
00907 ROS_ERROR("Consume input failed with error message: %s", PQerrorMessage(connection_));
00908 return false;
00909 }
00910
00911 if ((notify = PQnotifies(connection_)) != NULL)
00912 {
00913 no.channel = notify->relname;
00914 no.sending_pid = notify->be_pid;
00915 no.payload = notify->extra;
00916 PQfreemem(notify);
00917 }
00918 else
00919 {
00920 no.channel = "";
00921 no.sending_pid = 0;
00922 no.payload = "";
00923 PQfreemem(notify);
00924 }
00925 return true;
00926 }
00927
00929 bool PostgresqlDatabase::waitForNotify(Notification &no)
00930 {
00931 int sock;
00932 fd_set input_mask;
00933 while (true)
00934 {
00935
00936 sock = PQsocket(connection_);
00937 if (sock < 0)
00938 {
00939 break;
00940 }
00941 FD_ZERO(&input_mask);
00942 FD_SET(sock, &input_mask);
00943
00944 if (select(sock + 1, &input_mask, NULL, NULL, NULL) < 0)
00945 {
00946 ROS_WARN("Select() on the database connection failed: %s\n", strerror(errno));
00947 break;
00948 }
00949
00950 if (!checkNotify(no)) return false;
00951
00952 if (no.sending_pid != 0) return true;
00953 }
00954 return false;
00955 }
00956
00957 }