postgresql_database.cpp
Go to the documentation of this file.
00001 /*********************************************************************
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2009, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of the Willow Garage nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  *********************************************************************/
00034 
00035 // Author(s): Matei Ciocarlie
00036 
00037 #include "database_interface/postgresql_database.h"
00038 
00039 // the header of the libpq library
00040 #include <libpq-fe.h>
00041 
00042 #include <sstream>
00043 #include <iostream>
00044 
00045 namespace database_interface {
00046 
00047 void operator>>(const YAML::Node& node, PostgresqlDatabaseConfig &options)
00048 {
00049 #ifdef HAVE_NEW_YAMLCPP
00050   options.password_ = node["password"].as<std::string>();
00051   options.user_ = node["user"].as<std::string>();
00052   options.host_ = node["host"].as<std::string>();
00053   options.port_ = node["port"].as<std::string>();
00054   options.dbname_ = node["dbname"].as<std::string>();
00055 #else
00056   node["password"] >> options.password_;
00057   node["user"] >> options.user_;
00058   node["host"] >> options.host_;
00059   node["port"] >> options.port_;
00060   node["dbname"] >> options.dbname_;
00061 #endif
00062 }
00063 
00068 class PostgresqlDatabase::PGresultAutoPtr
00069 {
00070 private:
00071   PGresult* result_;
00072 public:
00073   PGresultAutoPtr(PGresult *ptr) : result_(ptr){}
00074   ~PGresultAutoPtr(){PQclear(result_);}
00075   void reset(PGresult *ptr){PQclear(result_); result_=ptr;}
00076   PGresult* operator * (){return result_;}
00077 };
00078 
00079 
00080 void PostgresqlDatabase::pgMDBconstruct(std::string host, std::string port, std::string user,
00081                                                  std::string password, std::string dbname )
00082 {
00083   std::string conn_info;
00084   //adding empty strings can cause weird things, as they are not expected to be empty
00085   if (!host.empty()) conn_info += "host=" + host;
00086   if (!port.empty()) conn_info += " port=" + port;
00087   if (!user.empty()) conn_info += " user=" + user;
00088   if (!password.empty()) conn_info += " password=" + password;
00089   if (!dbname.empty()) conn_info += " dbname=" + dbname;
00090   connection_= PQconnectdb(conn_info.c_str());
00091   if (PQstatus(connection_)!=CONNECTION_OK) 
00092   {
00093     ROS_ERROR("Database connection failed with error message: %s", PQerrorMessage(connection_));
00094   }
00095 }
00096 
00097 PostgresqlDatabase::PostgresqlDatabase(const PostgresqlDatabaseConfig &config)
00098   : in_transaction_(false)
00099 {
00100   pgMDBconstruct(config.getHost(), config.getPort(), config.getUser(), 
00101                  config.getPassword(), config.getDBname());
00102 }
00103 
00104 PostgresqlDatabase::PostgresqlDatabase(std::string host, std::string port, std::string user,
00105                                                  std::string password, std::string dbname )
00106   : in_transaction_(false)
00107 {
00108   pgMDBconstruct(host, port, user, password, dbname);
00109 }
00110 
00111 PostgresqlDatabase::~PostgresqlDatabase()
00112 {
00113   PQfinish(connection_);
00114 }
00115 
00116 bool PostgresqlDatabase::isConnected() const
00117 {
00118   if (PQstatus(connection_)==CONNECTION_OK) return true;
00119   else return false;
00120 }
00121 
00123 bool PostgresqlDatabase::rollback()
00124 {
00125   PGresultAutoPtr result((PQexec(connection_,"ROLLBACK;")));
00126   if (PQresultStatus(*result) != PGRES_COMMAND_OK)
00127   {
00128     ROS_ERROR("Rollback failed");
00129     return false;
00130   }
00131   in_transaction_ = false;
00132   return true;
00133 }
00134 
00136 bool PostgresqlDatabase::begin()
00137 {
00138   if( in_transaction_ ) return true;
00139   //place a begin
00140   PGresultAutoPtr result(PQexec(connection_, "BEGIN;"));
00141   if (PQresultStatus(*result) != PGRES_COMMAND_OK)
00142   {
00143     ROS_ERROR("Database begin query failed. Error: %s", PQresultErrorMessage(*result));
00144     return false;
00145   }
00146   in_transaction_ = true;
00147   return true;
00148 }
00149 
00151 bool PostgresqlDatabase::commit()
00152 {
00153   PGresultAutoPtr result(PQexec(connection_, "COMMIT;"));
00154   if (PQresultStatus(*result) != PGRES_COMMAND_OK)
00155   {
00156     ROS_ERROR("Database commit query failed. Error: %s", PQresultErrorMessage(*result));
00157     return false;
00158   }
00159   in_transaction_ = false;
00160   return true;
00161 }
00162 
00163 bool PostgresqlDatabase::getVariable(std::string name, std::string &value) const
00164 {
00165   std::string query("SELECT variable_value FROM variable WHERE variable_name=" + name);
00166   PGresultAutoPtr result(PQexec(connection_, query.c_str()));  
00167   if (PQresultStatus(*result) != PGRES_TUPLES_OK)
00168   {
00169     ROS_ERROR("Database get variable query failed. Error: %s", PQresultErrorMessage(*result));
00170     return false;
00171   }
00172   if (PQntuples(*result)==0)
00173   {
00174     ROS_ERROR("Database get variable query failed. Variable %s not in database", name.c_str());
00175     return false;
00176   }
00177   value = PQgetvalue(*result, 0, 0);
00178   return true;
00179 }
00180 
00184 bool PostgresqlDatabase::getSequence(std::string name, std::string &value)
00185 {
00186   std::string query("SELECT * FROM currval('" + name + "');");
00187   PGresultAutoPtr result( PQexec(connection_, query.c_str()) );
00188   if (PQresultStatus(*result) != PGRES_TUPLES_OK)
00189   {
00190     ROS_ERROR("Get sequence: query failed. Error: %s", PQresultErrorMessage(*result));
00191     return false;
00192   }
00193   if (!PQntuples(*result))
00194   {
00195     ROS_ERROR("Get sequence: sequence %s not found", name.c_str());
00196     return false;
00197   }
00198   const char *id_char = PQgetvalue(*result, 0, 0);
00199   value.assign(id_char);
00200   return true;
00201 }
00202 
00212 bool PostgresqlDatabase::getListRawResult(const DBClass *example, 
00213                                                    std::vector<const DBFieldBase*> &fields, 
00214                                                    std::vector<int> &column_ids,
00215                                                    std::string where_clause,
00216                                                    boost::shared_ptr<PGresultAutoPtr> &result, int &num_tuples) const
00217 {
00218   //we cannot handle binary results in here; libpq does not support binary results
00219   //for just part of the query, so they all have to be text
00220   if(example->getPrimaryKeyField()->getType() == DBFieldBase::BINARY)
00221   {
00222     ROS_ERROR("Database get list: can not use binary primary key (%s)", 
00223               example->getPrimaryKeyField()->getName().c_str());
00224     return false;
00225   }
00226 
00227   std::string select_query;
00228   select_query += "SELECT " + example->getPrimaryKeyField()->getName() + " ";  
00229   fields.push_back(example->getPrimaryKeyField());
00230 
00231   //we will store here the list of tables we will join on
00232   std::vector<std::string> join_tables;
00233   std::string join_clauses;
00234   for (size_t i=0; i<example->getNumFields(); i++)
00235   {
00236     if (!example->getField(i)->getReadFromDatabase()) continue;
00237 
00238     if (example->getField(i)->getType()==DBFieldBase::BINARY)
00239     {
00240       ROS_WARN("Database get list: binary field (%s) can not be loaded by default", 
00241                example->getField(i)->getName().c_str());
00242       continue;
00243     }
00244 
00245     select_query += ", " + example->getField(i)->getName();
00246     fields.push_back(example->getField(i));
00247     if ( example->getField(i)->getTableName() != example->getPrimaryKeyField()->getTableName() )
00248     {
00249       //check if we are already joining on this table
00250       bool already_join = false;
00251       for (size_t j=0; j<join_tables.size() && !already_join; j++)
00252       {
00253         if (join_tables[j] == example->getField(i)->getTableName()) already_join = true;
00254       }
00255       
00256       if (!already_join)
00257       {
00258         const DBFieldBase *foreign_key = NULL;
00259         if (!example->getForeignKey(example->getField(i)->getTableName(), foreign_key))
00260         {
00261           ROS_ERROR("Database get list: could not find foreign key for table %s", 
00262                     example->getField(i)->getTableName().c_str());
00263           return false;
00264         }
00265         join_clauses += " JOIN " + example->getField(i)->getTableName() + " USING (" 
00266           + foreign_key->getName() + ") ";
00267         join_tables.push_back( example->getField(i)->getTableName() );
00268       }
00269     }
00270   }
00271 
00272   select_query += " FROM " + example->getPrimaryKeyField()->getTableName() + " ";
00273 
00274   if (!join_clauses.empty())
00275   {
00276     select_query += join_clauses;
00277   }
00278 
00279   if (!where_clause.empty())
00280   {
00281     select_query += " WHERE " + where_clause;
00282   }
00283 
00284   select_query += ";";
00285 
00286   //ROS_INFO("Query: %s", select_query.c_str());
00287 
00288   PGresult* raw_result = PQexec(connection_, select_query.c_str());
00289   result.reset( new PGresultAutoPtr(raw_result) );
00290   if (PQresultStatus(raw_result) != PGRES_TUPLES_OK)
00291   {
00292     ROS_ERROR("Database get list: query failed. Error: %s", PQresultErrorMessage(raw_result));
00293     return false;
00294   }
00295   
00296   num_tuples = PQntuples(raw_result);
00297   if (!num_tuples) 
00298   {
00299     return true;
00300   }
00301   
00302   //store the column id's for each field we retrieved
00303   for (size_t t=0; t<fields.size(); t++)
00304   {
00305     int id =  PQfnumber(raw_result, fields[t]->getName().c_str());
00306     if (id < 0)
00307     {
00308       ROS_ERROR("Database get list: column %s missing in result", fields[t]->getName().c_str());
00309       return false;
00310     }
00311     column_ids.push_back(id);
00312   }   
00313   return true;
00314 }
00315 
00320 bool PostgresqlDatabase::populateListEntry(DBClass *entry, boost::shared_ptr<PGresultAutoPtr> result, 
00321                                                     int row_num,
00322                                                     const std::vector<const DBFieldBase*> &fields,
00323                                                     const std::vector<int> &column_ids) const
00324 {
00325   for (size_t t=0; t<fields.size(); t++)
00326   {
00327     const char* char_value =  PQgetvalue(**result, row_num, column_ids[t]);
00328     DBFieldBase *entry_field = entry->getField(fields[t]->getName());
00329     if (!entry_field)
00330     {
00331       ROS_ERROR("Database get list: new entry missing field %s", fields[t]->getName().c_str());
00332       return false;
00333     }
00334     if ( !entry_field->fromString(char_value) )
00335     {
00336       ROS_ERROR("Database get list: failed to parse response \"%s\" for field \"%s\"",   
00337                 char_value, fields[t]->getName().c_str()); 
00338       return false;
00339     }
00340   }
00341   return true;
00342 }
00343 
00350 bool PostgresqlDatabase::countList(const DBClass *example, int &count, std::string where_clause) const
00351 {
00352   const DBFieldBase* pk_field = example->getPrimaryKeyField();
00353   
00354   std::string query( "SELECT COUNT(" + pk_field->getName() + ") FROM " + pk_field->getTableName() );
00355   if (!where_clause.empty())
00356   {
00357     query += " WHERE " + where_clause;
00358   }
00359   query += ";";
00360 
00361   ROS_INFO("Query (count): %s", query.c_str());
00362   PGresultAutoPtr result( PQexec(connection_, query.c_str()) );
00363                          
00364   if (PQresultStatus(*result) != PGRES_TUPLES_OK)
00365   {
00366     ROS_ERROR("Database count list query failed. Error: %s", PQresultErrorMessage(*result));
00367     return false;
00368   }
00369   const char *reply =  PQgetvalue(*result, 0, 0);
00370   if (!DBStreamable<int>::streamableFromString(count, reply)) 
00371   {
00372     ROS_ERROR("Database count list failed. Could not understand reply: %s", reply);
00373     return false;
00374   }
00375   return true;  
00376 }
00377 
00384 bool PostgresqlDatabase::saveToDatabase(const DBFieldBase* field)
00385 {
00386   if (!field->getWritePermission())
00387   {
00388     ROS_ERROR("Database save field: field %s does not have write permission", field->getName().c_str());
00389     return false;
00390   }
00391 
00392   const DBFieldBase* key_field;
00393   if (field->getTableName() == field->getOwner()->getPrimaryKeyField()->getTableName()) 
00394   {
00395     key_field = field->getOwner()->getPrimaryKeyField();
00396   }
00397   else 
00398   {
00399     if (!field->getOwner()->getForeignKey(field->getTableName(), key_field))
00400     {
00401       ROS_ERROR("Database save field: could not find foreign key for table %s", 
00402                 field->getTableName().c_str());
00403       return false;
00404     }
00405     //here we could also check if the join is done on our primary key, and 
00406     //reject if not insted of using the write_permisison flag
00407   }
00408  
00409   //prepare query with parameters so we can use binary data if needed
00410 
00411   std::string query("UPDATE " + field->getTableName() + 
00412                     " SET " + field->getName() + "=$2"
00413                     " WHERE " + key_field->getName() + "=$1;");
00414 
00415   std::vector<const char*> param_values(2);
00416   std::vector<int> param_lengths(2);
00417   std::vector<int> param_formats(2);
00418   //first parameter is always text
00419   std::string id_str;
00420   if (!key_field->toString(id_str))
00421   {
00422     ROS_ERROR("Database save field: failed to convert key id value to string");
00423     return false;
00424   }
00425   param_formats[0] = 0;
00426   param_values[0] = id_str.c_str();
00427 
00428   //second parameter could be binary
00429   std::string value_str;
00430   if (field->getType() == DBFieldBase::TEXT)
00431   {
00432     if (!field->toString(value_str)) 
00433     {
00434       ROS_ERROR("Database save field: failed to convert field value to string");
00435       return false;
00436     }
00437     param_formats[1] = 0;
00438     param_values[1] = value_str.c_str();
00439   }
00440   else if (field->getType() == DBFieldBase::BINARY)
00441   {
00442     size_t length;
00443     if (!field->toBinary(param_values[1], length))
00444     {
00445       ROS_ERROR("Database save field: failed to convert field value to binary");
00446       return false;
00447     }
00448     param_lengths[1] = length;
00449     param_formats[1] = 1;
00450   }
00451   else
00452   {
00453     ROS_ERROR("Database save field: unkown field type");
00454     return false;
00455   }
00456 
00457   //ROS_INFO("Save field query: %s $1=%s, $2=%s", query.c_str(), param_values[0], param_values[1]);
00458 
00459   PGresultAutoPtr result( PQexecParams(connection_, query.c_str(), 2, 
00460                                        NULL, &param_values[0], &param_lengths[0], &param_formats[0], 0) );
00461   if (PQresultStatus(*result) != PGRES_COMMAND_OK)
00462   {
00463     ROS_ERROR("Database save field: query failed. Error: %s", PQresultErrorMessage(*result));
00464     return false;
00465   }
00466   return true;
00467 }
00468 
00474 bool PostgresqlDatabase::loadFromDatabase(DBFieldBase* field) const
00475 {
00476   const DBFieldBase* key_field = NULL;
00477   if (field->getTableName() == field->getOwner()->getPrimaryKeyField()->getTableName())
00478   {
00479     key_field = field->getOwner()->getPrimaryKeyField();
00480   }
00481   else 
00482   {
00483     if (!field->getOwner()->getForeignKey(field->getTableName(), key_field))
00484     {
00485       ROS_ERROR("Database load field: could not find foreign key for table %s", 
00486                 field->getTableName().c_str());
00487       return false;
00488     }
00489   }
00490 
00491   std::string id_str;
00492   if (!key_field->toString(id_str))
00493   {
00494     ROS_ERROR("Database load field: failed to convert key id value to string");
00495     return false;
00496   }
00497 
00498   std::string query("SELECT " + field->getName() + " FROM " + field->getTableName() + 
00499                     " WHERE " + key_field->getName() + " ='" + id_str + "';");
00500 
00501   //ROS_INFO_STREAM("Load field query: " << query);
00502 
00503   int data_type;
00504   if (field->getType() == DBFieldBase::TEXT) data_type = 0;
00505   else if (field->getType() == DBFieldBase::BINARY) data_type = 1;
00506   else
00507   {
00508     ROS_ERROR("Database load field: unkown field type");
00509     return false;
00510   }
00511 
00512   PGresultAutoPtr result( PQexecParams(connection_, query.c_str(), 0, NULL, NULL, NULL, NULL, data_type) );
00513   if (PQresultStatus(*result) != PGRES_TUPLES_OK)
00514   {
00515     ROS_ERROR("Database load field: query failed. Error: %s", PQresultErrorMessage(*result));
00516     return false;
00517   }
00518 
00519   if (PQntuples(*result)==0)
00520   {
00521     ROS_ERROR("Database load field: no entry found for key value %s on column %s", 
00522               id_str.c_str(), key_field->getName().c_str());
00523     return false;
00524   }
00525 
00526   const char* result_char =  PQgetvalue(*result, 0, 0);
00527   if (field->getType() == DBFieldBase::TEXT)
00528   {
00529     if ( !field->fromString(result_char) )
00530     {
00531       ROS_ERROR("Database load field: failed to parse text result \"%s\" for field \"%s\"", 
00532                 result_char, field->getName().c_str()); 
00533       return false;
00534     }
00535   } 
00536   else if (field->getType() == DBFieldBase::BINARY)
00537   {
00538     size_t length = PQgetlength(*result, 0, 0);
00539     if (!field->fromBinary(result_char, length))
00540     {
00541       ROS_ERROR("Database load field: failed to parse binary result length %d for field \"%s\"",
00542                 (int) length, field->getName().c_str()); 
00543       return false;
00544     }
00545   }
00546   else
00547   {
00548     ROS_ERROR("Database load field: failed to parse unkown field type");
00549     return false;
00550   }
00551 
00552   return true;
00553 }
00554 
00563 bool PostgresqlDatabase::insertIntoTable(std::string table_name,
00564                                                   const std::vector<const DBFieldBase*> &fields)
00565 {
00566   if (fields.empty())
00567   {
00568     ROS_ERROR("Insert into table: no columns to insert");
00569     return false;
00570   }
00571 
00572   std::string query("INSERT INTO " + table_name + "(");
00573 
00574   //the first field might be the foreign key
00575   if (table_name == fields[0]->getTableName())
00576   {
00577     //it's not, just any other field
00578     query += fields[0]->getName();
00579   }
00580   else
00581   {
00582     //we are inserting the foreign key, which for now is always assumed to be referencing our primary key
00583     const DBFieldBase *foreign_key = NULL;
00584     if (!fields[0]->getOwner()->getForeignKey(table_name, foreign_key))
00585     {
00586       ROS_ERROR("Database insert into table: could not find foreign key for table %s", table_name.c_str());
00587       return false;
00588     }
00589     query += foreign_key->getName();
00590   }
00591 
00592   for(size_t i=1; i<fields.size(); i++)
00593   {
00594     query += "," + fields[i]->getName();
00595   }
00596   query += ")";
00597 
00598   query += "VALUES(";
00599   for (size_t i=0; i<fields.size(); i++)
00600   {
00601     if ( i!=0 && table_name!=fields[i]->getTableName())
00602     {
00603       ROS_ERROR("Database insert into table: field table does not match table name");
00604       return false;
00605     }
00606     if (i!=0) query += ",";
00607     std::ostringstream ss;
00608     ss << i+1;
00609     query += "$" + ss.str();
00610   }
00611   query += ");";
00612   
00613   //ROS_INFO("Query: %s", query.c_str());
00614 
00615   //now prepare the arguments
00616   std::vector<std::string> param_strings(fields.size());
00617   std::vector<const char*> param_values(fields.size());
00618   std::vector<int> param_lengths(fields.size());
00619   std::vector<int> param_formats(fields.size());
00620   for (size_t i=0; i<fields.size(); i++)
00621   {
00622     if (fields[i]->getType() == DBFieldBase::TEXT)
00623     {
00624       if (!fields[i]->toString(param_strings[i]))
00625       {
00626         ROS_ERROR("Database insert into table: could not parse field %s", fields[i]->getName().c_str());
00627         return false;
00628       }
00629       param_values[i] = param_strings[i].c_str();
00630       param_formats[i] = 0;
00631     }
00632     else if (fields[i]->getType() == DBFieldBase::BINARY)
00633     {
00634       size_t length;
00635       if (!fields[i]->toBinary(param_values[i], length))
00636       {
00637         ROS_ERROR("Database insert into table: could not binarize field %s", fields[i]->getName().c_str());
00638         return false;
00639       }
00640       param_lengths[i] = length;
00641       param_formats[i] = 1;
00642     }
00643     else
00644     {
00645       ROS_ERROR("Database insert into table: unknown field type");
00646       return false;
00647     }
00648   }
00649 
00650   //and send the query
00651   PGresultAutoPtr result( PQexecParams(connection_, query.c_str(), fields.size(), NULL, 
00652                                        &(param_values[0]), &(param_lengths[0]), &(param_formats[0]), 0) );
00653   if (PQresultStatus(*result) != PGRES_COMMAND_OK)
00654   {
00655     ROS_ERROR("Database insert into table: query failed.\nError: %s.\nQuery: %s",
00656               PQresultErrorMessage(*result), query.c_str());
00657     return false;
00658   }
00659 
00660   return true;
00661 }
00662 
00669 bool PostgresqlDatabase::insertIntoDatabase(DBClass* instance)
00670 {
00671   //primary key must be text; its table is first
00672   DBFieldBase* pk_field = instance->getPrimaryKeyField();
00673   if (pk_field->getType() != DBFieldBase::TEXT)
00674   {
00675     ROS_ERROR("Database insert: cannot insert binary primary key %s", pk_field->getName().c_str());
00676     return false;
00677   }
00678 
00679   //make lists of which fields go in which table
00680   std::vector<std::string> table_names;
00681   std::vector< std::vector<const DBFieldBase*> > table_fields;
00682 
00683   //the first table we must insert into always belongs to the primary key
00684   table_names.push_back(pk_field->getTableName());
00685   table_fields.push_back( std::vector<const DBFieldBase*>() );
00686   
00687   bool insert_pk;
00688   if (pk_field->getWriteToDatabase())
00689   {
00690     //we will explicitly insert the primary key in its own table
00691     table_fields.back().push_back(pk_field);
00692     insert_pk = true;
00693   }
00694   else
00695   {
00696     //we do not insert the primary key in its own table; presumably it has a 
00697     //default value from a sequence which we will retrieve afterwards
00698     if (pk_field->getSequenceName().empty())
00699     {
00700       ROS_ERROR("Database insert: attempt to insert instance without primary key and no sequence for it");
00701       return false;
00702     }
00703     insert_pk = false;
00704   }
00705   
00706   //prepare insertions into other tables
00707   //note that even if we are inserting no data, we still need to make an entry
00708   //containing the foreign key so we have it for future insertions
00709   for (size_t i=0; i<instance->getNumFields(); i++)
00710   {
00711     //see if we are already inserting in this table
00712     bool found = false;
00713     size_t t;
00714     for (t=0; t<table_names.size(); t++)
00715     {
00716       if ( table_names[t] == instance->getField(i)->getTableName() )
00717       {
00718         found = true;
00719         break;
00720       }
00721     }
00722     if (!found)
00723     {
00724       //if we are not joining on our primary key, we should not be inserting in this table
00725       const DBFieldBase* foreign_key = NULL;
00726       if (!instance->getForeignKey(instance->getField(i)->getTableName(), foreign_key))
00727       {
00728         ROS_ERROR("Database insert into table: could not find foreign key for table %s", 
00729           instance->getField(i)->getTableName().c_str());
00730         return false;
00731       }
00732       if (foreign_key != pk_field) 
00733       {
00734         continue;
00735       }
00736       //we are joining on primary key, so we will need to insert it in other table as well
00737       table_names.push_back( instance->getField(i)->getTableName() );
00738       table_fields.push_back( std::vector<const DBFieldBase*>() );
00739       //in all the other tables we must explicitly insert the value of our 
00740       //primary key as it is the foreign key in all other tables
00741       table_fields.back().push_back(pk_field);
00742       t = table_names.size() - 1;
00743     }
00744     if ( !instance->getField(i)->getWriteToDatabase() ) continue;
00745     if ( instance->getField(i)->getType() != DBFieldBase::TEXT )
00746     {
00747       ROS_WARN("Database insert: cannot insert binary field %s in database", 
00748         instance->getField(i)->getName().c_str());
00749       continue;
00750     }
00751     //insert the field itself
00752     table_fields[t].push_back(instance->getField(i));
00753   }
00754   
00755   //BEGIN transaction
00756   if (!begin()) return false;
00757 
00758   //first we insert into the primary key's table
00759   if (!insertIntoTable(table_names[0], table_fields[0])) 
00760   {
00761     rollback();
00762     return false;
00763   }
00764 
00765   //if we have to, retrieve the primary key
00766   if (!insert_pk)
00767   {
00768     std::string sequence_value;
00769     if (!getSequence(pk_field->getSequenceName(), sequence_value) || !pk_field->fromString(sequence_value))
00770     {
00771       ROS_ERROR("Database insert: failed to retrieve primary key after insertion");
00772       rollback();
00773       return false;
00774     }
00775   }
00776 
00777   //insert into the rest of the tables
00778   for (size_t i=1; i<table_names.size(); i++)
00779   {
00780     if (!insertIntoTable(table_names[i], table_fields[i]))
00781     {
00782       rollback();
00783       return false;
00784     }
00785   }
00786 
00787   //COMMIT transaction
00788   if (!commit()) return false;
00789 
00790   return true;
00791 }
00792 
00794 bool PostgresqlDatabase::deleteFromTable(std::string table_name, const DBFieldBase *key_field)
00795 {
00796   std::string id_str;
00797   if (!key_field->toString(id_str))
00798   {
00799     ROS_ERROR("Database delete from table: failed to convert key id value to string");
00800     return false;
00801   }
00802 
00803   std::string query("DELETE FROM " + table_name + " WHERE " + key_field->getName() + "=" + id_str);
00804   PGresultAutoPtr result( PQexec(connection_, query.c_str()) );
00805   if (PQresultStatus(*result) != PGRES_COMMAND_OK)
00806   {
00807     ROS_ERROR("Database delete from table: query failed. Error: %s", PQresultErrorMessage(*result));
00808     return false;
00809   }
00810   return true;
00811 }
00812 
00816 bool PostgresqlDatabase::deleteFromDatabase(DBClass* instance)
00817 {
00818   std::vector<std::string> table_names;
00819   std::vector<const DBFieldBase*> table_fields;
00820   DBFieldBase* pk_field = instance->getPrimaryKeyField();
00821   //the table of the primary key
00822   table_names.push_back( pk_field->getTableName() );
00823   table_fields.push_back( pk_field );
00824   //prepare deletions from the other tables
00825   for (size_t i=0; i<instance->getNumFields(); i++)
00826   {
00827     //see if we are already deleting in this table
00828     size_t t;
00829     for (t=0; t<table_names.size(); t++)
00830     {
00831       if ( table_names[t] == instance->getField(i)->getTableName() ) break;
00832     }
00833     if (t<table_names.size()) continue;
00834 
00835     //if we are not joining on our primary key, we should NOT be deleting from this table
00836     const DBFieldBase* foreign_key = NULL;
00837     if (!instance->getForeignKey(instance->getField(i)->getTableName(), foreign_key))
00838     {
00839       ROS_ERROR("Database insert into table: could not find foreign key for table %s", 
00840                 instance->getField(i)->getTableName().c_str());
00841       return false;
00842     }
00843     if (foreign_key != pk_field) continue;
00844 
00845     //we are joining on primary key, so we will need to delete the row in the other table as well
00846     table_names.push_back( instance->getField(i)->getTableName() );
00847     table_fields.push_back( pk_field );
00848   }
00849 
00850   //BEGIN transaction
00851   if (!begin()) return false;
00852   
00853   //delete from all tables, but primary key table goes last
00854   for (int i=(int)table_names.size()-1; i>=0; i--)
00855   {
00856     if (!deleteFromTable(table_names[i], table_fields[i]))
00857     {
00858       rollback();
00859       return false;
00860     }
00861   }
00862 
00863   //COMMIT transaction
00864   if (!commit()) return false;
00865 
00866   return true;
00867 
00868 }
00869 
00871 bool PostgresqlDatabase::listenToChannel(std::string channel) {
00872   std::string query = "LISTEN " + channel;
00873   PGresultAutoPtr result = PQexec(connection_,query.c_str());
00874   if (PQresultStatus(*result) != PGRES_COMMAND_OK)
00875       {
00876           ROS_WARN("LISTEN command failed: %s", PQerrorMessage(connection_));
00877           return false;
00878       }
00879   ROS_INFO("Now listening to channel \"%s\"",channel.c_str());
00880   return true;
00881 }
00882 
00884 bool PostgresqlDatabase::unlistenToChannel(std::string channel)
00885 {
00886   std::string query = "UNLISTEN " + channel + " ;";
00887   PGresultAutoPtr result = PQexec(connection_,query.c_str());
00888   if (PQresultStatus(*result) != PGRES_COMMAND_OK)
00889   {
00890     ROS_WARN("UNLISTEN command failed: %s", PQerrorMessage(connection_));
00891     return false;
00892   }
00893   ROS_INFO("Not listening to channel \"%s\" anymore.",channel.c_str());
00894   return true;
00895 }
00896 
00901 bool PostgresqlDatabase::checkNotify(Notification &no)
00902 {
00903   PGnotify *notify;
00904   //check for a notify on the connection
00905   if (!PQconsumeInput(connection_))
00906   {
00907     ROS_ERROR("Consume input failed with error message: %s", PQerrorMessage(connection_));
00908     return false;
00909   }
00910   //deal with the received object
00911   if ((notify = PQnotifies(connection_)) != NULL)
00912   {
00913     no.channel = notify->relname;
00914     no.sending_pid = notify->be_pid;
00915     no.payload = notify->extra;
00916     PQfreemem(notify);
00917   }
00918   else
00919   {
00920     no.channel = "";
00921     no.sending_pid = 0;
00922     no.payload = "";
00923     PQfreemem(notify);
00924   }
00925   return true;
00926 }
00927 
00929 bool PostgresqlDatabase::waitForNotify(Notification &no)
00930 {
00931   int sock;
00932   fd_set input_mask;
00933   while (true)
00934   {
00935     // Sleep until something happens on the connection.
00936     sock = PQsocket(connection_);     
00937     if (sock < 0)
00938     {
00939       break;
00940     }
00941     FD_ZERO(&input_mask);
00942     FD_SET(sock, &input_mask);
00943 
00944     if (select(sock + 1, &input_mask, NULL, NULL, NULL) < 0)
00945     {
00946       ROS_WARN("Select() on the database connection failed: %s\n", strerror(errno));
00947       break;
00948     }
00949     // Check for input
00950     if (!checkNotify(no)) return false;
00951     // Exit if we have a notification
00952     if (no.sending_pid != 0) return true;
00953   }
00954   return false;
00955 }
00956 
00957 }//namespace


sql_database
Author(s): Matei Ciocarlie and Lorenz Mosenlechner
autogenerated on Fri Aug 28 2015 13:11:16