parallel_solve.cpp
Go to the documentation of this file.
00001 // 
00002 // Copyright (c) 2010-2012, Benjamin Kaufmann
00003 // 
00004 // This file is part of Clasp. See http://www.cs.uni-potsdam.de/clasp/ 
00005 // 
00006 // Clasp is free software; you can redistribute it and/or modify
00007 // it under the terms of the GNU General Public License as published by
00008 // the Free Software Foundation; either version 2 of the License, or
00009 // (at your option) any later version.
00010 // 
00011 // Clasp is distributed in the hope that it will be useful,
00012 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00013 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014 // GNU General Public License for more details.
00015 // 
00016 // You should have received a copy of the GNU General Public License
00017 // along with Clasp; if not, write to the Free Software
00018 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00019 //
00020 #if WITH_THREADS
00021 #include <clasp/parallel_solve.h>
00022 #include <clasp/solver.h>
00023 #include <clasp/clause.h>
00024 #include <clasp/enumerator.h>
00025 #include <clasp/util/timer.h>
00026 #include <clasp/minimize_constraint.h>
00027 #include <clasp/util/mutex.h>
00028 #include <tbb/concurrent_queue.h>
00029 namespace Clasp { namespace mt {
00031 // BarrierSemaphore
00033 // A combination of a barrier and a semaphore
00034 class BarrierSemaphore {
00035 public:
00036         explicit BarrierSemaphore(int counter = 0, int maxParties = 1) : counter_(counter), active_(maxParties) {}
00037         // Initializes this object
00038         // PRE: no thread is blocked on the semaphore
00039         //      (i.e. internal counter is >= 0)
00040         // NOTE: not thread-safe
00041         void unsafe_init(int counter = 0, int maxParties = 1) {
00042                 counter_ = counter;
00043                 active_  = maxParties;
00044         }
00045         // Returns the current semaphore counter.
00046         int  counter()   { lock_guard<mutex> lock(semMutex_); return counter_; }
00047         // Returns the number of parties required to trip this barrier.
00048         int  parties()   { lock_guard<mutex> lock(semMutex_); return active_;  } 
00049         // Returns true if all parties are waiting at the barrier
00050         bool active()    { lock_guard<mutex> lock(semMutex_); return unsafe_active(); }
00051         
00052         // barrier interface
00053         
00054         // Increases the barrier count, i.e. the number of 
00055         // parties required to trip this barrier.
00056         void addParty() {
00057                 lock_guard<mutex> lock(semMutex_);
00058                 ++active_;
00059         }
00060         // Decreases the barrier count and resets the barrier
00061         // if reset is true. 
00062         // PRE: the thread does not itself wait on the barrier
00063         void removeParty(bool reset) {
00064                 unique_lock<mutex> lock(semMutex_);
00065                 assert(active_ > 0);
00066                 --active_;
00067                 if      (reset)           { unsafe_reset(0); }
00068                 else if (unsafe_active()) { counter_ = -active_; lock.unlock(); semCond_.notify_one(); }
00069         }
00070         // Waits until all parties have arrived, i.e. called wait.
00071         // Exactly one of the parties will receive a return value of true,
00072         // the others will receive a value of false.
00073         // Applications shall use this value to designate one thread as the
00074         // leader that will eventually reset the barrier thereby unblocking the other threads.
00075         bool wait() {
00076                 unique_lock<mutex> lock(semMutex_);
00077                 if (--counter_ >= 0) { counter_ = -1; }
00078                 return unsafe_wait(lock);
00079         }
00080         // Resets the barrier and unblocks any waiting threads.
00081         void reset(uint32 semCount = 0) {
00082                 lock_guard<mutex> lock(semMutex_);
00083                 unsafe_reset(semCount);
00084         }
00085         // semaphore interface
00086         
00087         // Decrement the semaphore's counter.
00088         // If the counter is zero or less prior to the call
00089         // the calling thread is suspended.
00090         // Returns false to signal that all but the calling thread
00091         // are currently blocked.
00092         bool down() {
00093                 unique_lock<mutex> lock(semMutex_);
00094                 if (--counter_ >= 0) { return true; }
00095                 return !unsafe_wait(lock);
00096         }
00097         // Increments the semaphore's counter and resumes 
00098         // one thread which has called down() if the counter 
00099         // was less than zero prior to the call.
00100         void up() {
00101                 bool notify;
00102                 {
00103                         lock_guard<mutex> lock(semMutex_);
00104                         notify    = (++counter_ < 1);
00105                 }
00106                 if (notify) semCond_.notify_one();
00107         }
00108 private:
00109         BarrierSemaphore(const BarrierSemaphore&);
00110         BarrierSemaphore& operator=(const BarrierSemaphore&);
00111         typedef condition_variable cv;
00112         bool    unsafe_active() const { return -counter_ >= active_; }
00113         void    unsafe_reset(uint32 semCount) {
00114                 int prev = counter_;
00115                 counter_ = semCount;
00116                 if (prev < 0) { semCond_.notify_all(); }
00117         }
00118         // Returns true for the leader, else false
00119         bool    unsafe_wait(unique_lock<mutex>& lock) {
00120                 assert(counter_ < 0);
00121                 // don't put the last thread to sleep!
00122                 if (!unsafe_active()) {
00123                         semCond_.wait(lock);
00124                 }
00125                 return unsafe_active();
00126         }       
00127         cv    semCond_;  // waiting threads
00128         mutex semMutex_; // mutex for updating counter
00129         int   counter_;  // semaphore's counter
00130         int   active_;   // number of active threads
00131 };
00133 // ParallelSolve::Impl
00135 struct ParallelSolve::SharedData {
00136         typedef tbb::concurrent_queue<const LitVec*> queue;
00137         enum MsgFlag {
00138                 terminate_flag        = 1u, sync_flag  = 2u,  split_flag    = 4u, 
00139                 restart_flag          = 8u, complete_flag = 16u,
00140                 interrupt_flag        = 32u, // set on terminate from outside
00141                 allow_split_flag      = 64u, // set if splitting mode is active
00142                 forbid_restart_flag   = 128u,// set if restarts are no longer allowed
00143                 cancel_restart_flag   = 256u,// set if current restart request was cancelled by some thread
00144                 restart_abandoned_flag= 512u,// set to signal that threads must not give up their gp
00145         };
00146         enum Message {
00147                 msg_terminate      = (terminate_flag),
00148                 msg_interrupt      = (terminate_flag | interrupt_flag),
00149                 msg_sync_restart   = (sync_flag | restart_flag),
00150                 msg_split          = split_flag
00151         };
00152         SharedData() : path(0) { reset(0); control = 0; }
00153         void reset(SharedContext* a_ctx) {
00154                 clearQueue();
00155                 syncT.reset();
00156                 workSem.unsafe_init(0, a_ctx ? a_ctx->concurrency() : 0);
00157                 globalR.reset();
00158                 maxConflict = globalR.current();
00159                 error       = 0;
00160                 initMask    = 0;
00161                 ctx         = a_ctx;
00162                 path        = 0;
00163                 nextId      = 1;
00164                 workReq     = 0;
00165                 restartReq  = 0;
00166         }
00167         void clearQueue() {
00168                 for (const LitVec* a = 0; workQ.try_pop(a); ) {
00169                         if (a != path) { delete a; }
00170                 }
00171         }
00172         const LitVec* requestWork(uint32 id) {
00173                 // try to get initial path
00174                 uint64 m(uint64(1) << id);
00175                 uint64 init = initMask;
00176                 if ((m & init) == m) { initMask -= m; return path; }
00177                 // try to get path from split queue
00178                 const LitVec* res = 0;
00179                 if (workQ.try_pop(res)) { return res; }
00180                 return 0;
00181         }
00182         // MESSAGES
00183         bool        hasMessage()  const { return (control & uint32(7)) != 0; }
00184         bool        synchronize() const { return (control & uint32(sync_flag))      != 0; }
00185         bool        terminate()   const { return (control & uint32(terminate_flag)) != 0; }
00186         bool        split()       const { return (control & uint32(split_flag))     != 0; }
00187         bool        postMessage(Message m, bool notify);
00188         void        aboutToSplit()      { if (--workReq == 0) updateSplitFlag();  }
00189         void        updateSplitFlag();
00190         // CONTROL FLAGS
00191         bool        hasControl(uint32 f) const { return (control & f) != 0;        }
00192         bool        interrupt()          const { return hasControl(interrupt_flag);}
00193         bool        complete()           const { return hasControl(complete_flag); }
00194         bool        restart()            const { return hasControl(restart_flag);  }
00195         bool        allowSplit()         const { return hasControl(allow_split_flag); }
00196         bool        allowRestart()       const { return !hasControl(forbid_restart_flag); }
00197         bool        setControl(uint32 flags)   { return (fetch_and_or(control, flags) & flags) != flags; }
00198         bool        clearControl(uint32 flags) { return (fetch_and_and(control, ~flags) & flags) == flags; }
00199         ScheduleStrategy globalR;     // global restart strategy
00200         uint64           maxConflict; // current restart limit
00201         uint64           error;       // bitmask of erroneous solvers
00202         SharedContext*   ctx;         // shared context object
00203         const LitVec*    path;        // initial guiding path - typically empty
00204         atomic<uint64>   initMask;    // bitmask for initialization
00205         Timer<RealTime>  syncT;       // thread sync time
00206         mutex            modelM;      // model-mutex 
00207         BarrierSemaphore workSem;     // work-semaphore
00208         queue            workQ;       // work-queue
00209         uint32           nextId;      // next solver id to use
00210         atomic<int>      workReq;     // > 0: someone needs work
00211         atomic<uint32>   restartReq;  // == numThreads(): restart
00212         atomic<uint32>   control;     // set of active message flags
00213         atomic<uint32>   modCount;    // coounter for synchronizing models
00214 };
00215 
00216 // post message to all threads
00217 bool ParallelSolve::SharedData::postMessage(Message m, bool notifyWaiting) {
00218         if (m == msg_split) {
00219                 if (++workReq == 1) { updateSplitFlag(); }
00220                 return true;
00221         }
00222         else if (setControl(m)) {
00223                 // control message - notify all if requested
00224                 if (notifyWaiting) workSem.reset();
00225                 if ((uint32(m) & uint32(sync_flag|terminate_flag)) != 0) {
00226                         syncT.reset();
00227                         syncT.start();
00228                 }
00229                 return true;
00230         }
00231         return false;
00232 }
00233 
00234 void ParallelSolve::SharedData::updateSplitFlag() {
00235         for (bool splitF;;) {
00236                 splitF = (workReq > 0); 
00237                 if (split() == splitF) return;
00238                 if (splitF) fetch_and_or(control,   uint32(split_flag)); 
00239                 else        fetch_and_and(control, ~uint32(split_flag));
00240         }
00241 }
00243 // ParallelSolve
00245 ParallelSolve::ParallelSolve(Enumerator* e, const ParallelSolveOptions& opts)
00246         : SolveAlgorithm(e, opts.limit)
00247         , shared_(new SharedData)
00248         , thread_(0)
00249         , distribution_(opts.distribute)
00250         , maxRestarts_(0)
00251         , intGrace_(1024)
00252         , intTopo_(opts.integrate.topo)
00253         , intFlags_(ClauseCreator::clause_not_root_sat | ClauseCreator::clause_no_add)
00254         , initialGp_(opts.algorithm.mode == ParallelSolveOptions::Algorithm::mode_split ? gp_split : gp_fixed) {
00255         setRestarts(opts.restarts.maxR, opts.restarts.sched);
00256         setIntegrate(opts.integrate.grace, opts.integrate.filter);
00257 }
00258 
00259 ParallelSolve::~ParallelSolve() {
00260         if (shared_->nextId > 1) {
00261                 // algorithm was not started but there may be active threads -
00262                 // force orderly shutdown
00263                 ParallelSolve::doInterrupt();
00264                 shared_->workSem.removeParty(true);
00265                 joinThreads();
00266         }
00267         destroyThread(masterId);
00268         delete shared_;
00269 }
00270 
00271 bool ParallelSolve::beginSolve(SharedContext& ctx) {
00272         assert(ctx.concurrency() && "Illegal number of threads");
00273         if (shared_->terminate()) { return false; }
00274         shared_->reset(&ctx);
00275         shared_->setControl(initialGp_ == gp_split ? SharedData::allow_split_flag : SharedData::forbid_restart_flag);
00276         shared_->setControl(SharedData::sync_flag); // force initial sync with all threads
00277         shared_->modCount = uint32(enumerator().optimize());
00278         if (distribution_.types != 0 && ctx.distributor.get() == 0) {
00279                 ctx.distributor.reset(new mt::GlobalQueue(distribution_, ctx.concurrency(), intTopo_));
00280         }
00281         reportProgress(MessageEvent(*ctx.master(), "SYNC", MessageEvent::sent));
00282         for (uint32 i = 1; i != ctx.concurrency(); ++i) {
00283                 uint32 id = shared_->nextId++;
00284                 allocThread(id, *ctx.solver(id), ctx.configuration()->search(id));
00285                 // start in new thread
00286                 Clasp::thread x(std::mem_fun(&ParallelSolve::solveParallel), this, id);
00287                 thread_[id]->setThread(x);
00288         }
00289         return true;
00290 }
00291 
00292 void ParallelSolve::setIntegrate(uint32 grace, uint8 filter) {
00293         typedef ParallelSolveOptions::Integration Dist;
00294         intGrace_     = grace;
00295         intFlags_     = ClauseCreator::clause_no_add;
00296         if (filter == Dist::filter_heuristic) { store_set_bit(intFlags_, 31); }
00297         if (filter != Dist::filter_no)        { intFlags_ |= ClauseCreator::clause_not_root_sat; }
00298         if (filter == Dist::filter_sat)       { intFlags_ |= ClauseCreator::clause_not_sat; }
00299 }
00300 
00301 void ParallelSolve::setRestarts(uint32 maxR, const ScheduleStrategy& rs) {
00302         maxRestarts_         = maxR;
00303         shared_->globalR     = maxR ? rs : ScheduleStrategy::none();
00304         shared_->maxConflict = shared_->globalR.current();
00305 }
00306 
00307 uint32 ParallelSolve::numThreads() const { return shared_->workSem.parties(); }
00308 
00309 void ParallelSolve::allocThread(uint32 id, Solver& s, const SolveParams& p) {
00310         if (!thread_) {
00311                 uint32 n = numThreads();
00312                 thread_  = new ParallelHandler*[n];
00313                 std::fill(thread_, thread_+n, static_cast<ParallelHandler*>(0));
00314         }
00315         CLASP_PRAGMA_TODO("replace with CACHE_LINE_ALIGNED alloc")
00316         uint32 b   = ((sizeof(ParallelHandler)+63) / 64) * 64;
00317         thread_[id]= new (::operator new( b )) ParallelHandler(*this, s, p);
00318 }
00319 
00320 void ParallelSolve::destroyThread(uint32 id) {
00321         if (thread_ && thread_[id]) {
00322                 assert(!thread_[id]->joinable() && "Shutdown not completed!");
00323                 thread_[id]->~ParallelHandler();
00324                 ::operator delete(thread_[id]);
00325                 thread_[id] = 0;
00326                 if (id == masterId) {
00327                         delete [] thread_;
00328                         thread_ = 0;
00329                 }
00330         }
00331 }
00332 
00333 inline void ParallelSolve::reportProgress(const Event& ev) const {
00334         return shared_->ctx->report(ev);
00335 }
00336 
00337 // joins with and destroys all active threads
00338 void ParallelSolve::joinThreads() {
00339         int    ec     = thread_[masterId]->error();
00340         uint32 winner = thread_[masterId]->winner() ? uint32(masterId) : UINT32_MAX;
00341         shared_->error= ec ? 1 : 0;
00342         for (uint32 i = 1, end = shared_->nextId; i != end; ++i) {
00343                 if (thread_[i]->join() != 0) {
00344                         shared_->error |= uint64(1) << i;
00345                         ec = std::max(ec, thread_[i]->error());
00346                 }
00347                 if (thread_[i]->winner() && i < winner) {
00348                         winner = i;
00349                 }
00350                 destroyThread(i);
00351         }
00352         // detach master only after all client threads are done
00353         thread_[masterId]->detach(*shared_->ctx, shared_->interrupt());
00354         thread_[masterId]->setError(!shared_->interrupt() ? thread_[masterId]->error() : ec);
00355         shared_->ctx->setWinner(winner);
00356         shared_->nextId = 1;
00357         shared_->syncT.stop();
00358         reportProgress(MessageEvent(*shared_->ctx->master(), "TERMINATE", MessageEvent::completed, shared_->syncT.total()));
00359 }
00360 
00361 // Entry point for master solver
00362 bool ParallelSolve::doSolve(SharedContext& ctx, const LitVec& path) {
00363         if (beginSolve(ctx)) {
00364                 Solver& s = *ctx.master();
00365                 assert(s.id() == masterId);
00366                 allocThread(masterId, s, ctx.configuration()->search(masterId));
00367                 shared_->path = &path;
00368                 shared_->syncT.start();
00369                 solveParallel(masterId);
00370                 reportProgress(message(Event::subsystem_solve, "Joining with other threads", &s));
00371                 joinThreads();
00372                 int err = thread_[masterId]->error();
00373                 destroyThread(masterId);
00374                 shared_->ctx->distributor.reset(0);
00375                 switch(err) {
00376                         case error_none   : break;
00377                         case error_oom    : throw std::bad_alloc();
00378                         case error_runtime: throw std::runtime_error("RUNTIME ERROR!");
00379                         default:            throw std::runtime_error("UNKNOWN ERROR!");
00380                 }
00381         }
00382         return !shared_->complete();
00383 }
00384 
00385 // main solve loop executed by all threads
00386 void ParallelSolve::solveParallel(uint32 id) {
00387         Solver& s           = thread_[id]->solver();
00388         const SolveParams& p= thread_[id]->params();
00389         SolveLimits lim     = limits();
00390         SolverStats agg;
00391         PathPtr a(0);
00392         try {
00393                 // establish solver<->handler connection and attach to shared context
00394                 // should this fail because of an initial conflict, we'll terminate
00395                 // in requestWork.
00396                 thread_[id]->attach(*shared_->ctx);
00397                 BasicSolve solve(s, p, &lim);
00398                 agg.enableStats(s.stats);
00399                 for (GpType t; requestWork(s, a); solve.reset()) {
00400                         agg.accu(s.stats);
00401                         s.stats.reset();
00402                         thread_[id]->setGpType(t = a.is_owner() ? gp_split : initialGp_);
00403                         if (enumerator().start(s, *a, a.is_owner()) && thread_[id]->solveGP(solve, t, shared_->maxConflict) == value_free) {
00404                                 terminate(s, false);
00405                         }
00406                         s.clearStopConflict();
00407                         enumerator().end(s);
00408                 }
00409         }
00410         catch (const std::bad_alloc&)  { exception(id,a,error_oom, "ERROR: std::bad_alloc" ); }
00411         catch (const std::exception& e){ exception(id,a,error_runtime, e.what()); }
00412         catch (...)                    { exception(id,a,error_other, "ERROR: unknown");  }
00413         assert(shared_->terminate() || thread_[id]->error() != error_none);
00414         // this thread is leaving
00415         shared_->workSem.removeParty(shared_->terminate());
00416         // update stats
00417         s.stats.accu(agg);
00418         if (id != masterId) {
00419                 // remove solver<->handler connection and detach from shared context
00420                 // note: since detach can change the problem, we must not yet detach the master
00421                 // because some client might still be in the middle of an attach operation
00422                 thread_[id]->detach(*shared_->ctx, shared_->interrupt());
00423                 s.stats.addCpuTime(ThreadTime::getTime());
00424         }
00425 }
00426 
00427 void ParallelSolve::exception(uint32 id, PathPtr& path, ErrorCode e, const char* what) {
00428         try {
00429                 reportProgress(message(Event::subsystem_solve, what, &thread_[id]->solver()));
00430                 thread_[id]->setError(e);
00431                 if (id == masterId || shared_->workSem.active()) { 
00432                         ParallelSolve::doInterrupt();
00433                         return;
00434                 }
00435                 else if (path.get() && shared_->allowSplit()) {
00436                         shared_->workQ.push(path.release());
00437                         shared_->workSem.up();
00438                 }
00439                 reportProgress(warning(Event::subsystem_solve, "Thread failed and was removed.", &thread_[id]->solver()));
00440         }
00441         catch(...) { ParallelSolve::doInterrupt(); }
00442 }
00443 
00444 // forced termination from outside
00445 bool ParallelSolve::doInterrupt() {
00446         // do not notify blocked threads to avoid possible
00447         // deadlock in semaphore!
00448         shared_->postMessage(SharedData::msg_interrupt, false);
00449         return true;
00450 }
00451 
00452 // tries to get new work for the given solver
00453 bool ParallelSolve::requestWork(Solver& s, PathPtr& out) { 
00454         const LitVec* a = 0;
00455         while (!shared_->terminate()) {
00456                 // only clear path and stop conflict - we don't propagate() here
00457                 // because we would then have to handle any eventual conflicts
00458                 if (!s.popRootLevel(s.rootLevel())) {
00459                         // s has a real top-level conflict - problem is unsat
00460                         terminate(s, true);
00461                 }
00462                 else if (shared_->synchronize()) {
00463                         // a synchronize request is active - we are fine with
00464                         // this but did not yet had a chance to react on it
00465                         waitOnSync(s);
00466                 }
00467                 else if (a || (a = shared_->requestWork(s.id())) != 0) {
00468                         assert(s.decisionLevel() == 0);
00469                         // got new work from work-queue
00470                         out = a;
00471                         // do not take over ownership of initial gp!
00472                         if (a == shared_->path) { out.release(); }
00473                         // propagate any new facts before starting new work
00474                         if (s.simplify())       { return true; }
00475                         // s now has a conflict - either an artifical stop conflict
00476                         // or a real conflict - we'll handle it in the next iteration
00477                         // via the call to popRootLevel()
00478                 }
00479                 else if (shared_->allowSplit()) {
00480                         // gp mode is active - request a split  
00481                         // and wait until someone has work for us
00482                         reportProgress(MessageEvent(s, "SPLIT", MessageEvent::sent));
00483                         shared_->postMessage(SharedData::msg_split, false);
00484                         if (!shared_->workSem.down() && !shared_->synchronize()) {
00485                                 // we are the last man standing, there is no
00486                                 // work left - quitting time?
00487                                 terminate(s, true);
00488                         }
00489                 }
00490                 else {
00491                         // portfolio mode is active - no splitting, no work left
00492                         // quitting time? 
00493                         terminate(s, true);
00494                 }
00495         }
00496         return false;
00497 }
00498 
00499 // terminated from inside of algorithm
00500 // check if there is more to do
00501 void ParallelSolve::terminate(Solver& s, bool complete) {
00502         if (!shared_->terminate()) {
00503                 if (enumerator().tentative() && complete) {
00504                         if (shared_->setControl(SharedData::sync_flag|SharedData::complete_flag)) {
00505                                 thread_[s.id()]->setWinner();
00506                                 reportProgress(MessageEvent(s, "SYNC", MessageEvent::sent));
00507                         }
00508                 }
00509                 else {
00510                         reportProgress(MessageEvent(s, "TERMINATE", MessageEvent::sent));
00511                         shared_->postMessage(SharedData::msg_terminate, true);
00512                         thread_[s.id()]->setWinner();
00513                         if (complete) { shared_->setControl(SharedData::complete_flag); }
00514                 }
00515         }
00516 }
00517 
00518 // handles an active synchronization request
00519 // returns true to signal that s should restart otherwise false
00520 bool ParallelSolve::waitOnSync(Solver& s) {
00521         if (!thread_[s.id()]->handleRestartMessage()) {
00522                 shared_->setControl(SharedData::cancel_restart_flag);
00523         }
00524         bool hasPath  = thread_[s.id()]->hasPath();
00525         bool tentative= enumerator().tentative();
00526         if (shared_->workSem.wait()) {
00527                 // last man standing - complete synchronization request
00528                 shared_->workReq     = 0;
00529                 shared_->restartReq  = 0;
00530                 bool restart         = shared_->hasControl(SharedData::restart_flag);
00531                 bool init            = true;
00532                 if (restart) {
00533                         init = shared_->allowRestart() && !shared_->hasControl(SharedData::cancel_restart_flag);
00534                         if (init) { shared_->globalR.next(); }
00535                         shared_->maxConflict = shared_->allowRestart() && shared_->globalR.idx < maxRestarts_
00536                                 ? shared_->globalR.current()
00537                                 : UINT64_MAX;
00538                 }
00539                 else if (shared_->maxConflict != UINT64_MAX && !shared_->allowRestart()) {
00540                         shared_->maxConflict = UINT64_MAX;
00541                 }
00542                 if (init) { initQueue();  }
00543                 else      { shared_->setControl(SharedData::restart_abandoned_flag); }
00544                 if (tentative && shared_->complete()) {
00545                         if (enumerator().commitComplete()) { shared_->setControl(SharedData::terminate_flag); }
00546                         else                               { shared_->modCount = uint32(0); shared_->clearControl(SharedData::complete_flag); }
00547                 }
00548                 shared_->clearControl(SharedData::msg_split | SharedData::msg_sync_restart | SharedData::restart_abandoned_flag | SharedData::cancel_restart_flag);
00549                 shared_->syncT.lap();
00550                 reportProgress(MessageEvent(s, "SYNC", MessageEvent::completed, shared_->syncT.elapsed()));
00551                 // wake up all blocked threads
00552                 shared_->workSem.reset();
00553         }
00554         return shared_->terminate() || (hasPath && !shared_->hasControl(SharedData::restart_abandoned_flag));
00555 }
00556 
00557 // If guiding path scheme is active only one
00558 // thread can start with gp (typically empty) and this
00559 // thread must then split up search-space dynamically.
00560 // Otherwise, all threads can start with initial gp
00561 // TODO:
00562 //  heuristic for initial splits?
00563 void ParallelSolve::initQueue() {
00564         shared_->clearQueue();
00565         uint64 init = UINT64_MAX;
00566         if (shared_->allowSplit()) {
00567                 init = 0;
00568                 shared_->workQ.push(shared_->path);
00569         }
00570         shared_->initMask = init;
00571         assert(shared_->allowSplit() || shared_->hasControl(SharedData::forbid_restart_flag));
00572 }
00573 
00574 // adds work to the work-queue
00575 void ParallelSolve::pushWork(LitVec* v) { 
00576         assert(v);
00577         shared_->workQ.push(v);
00578         shared_->workSem.up();
00579 }
00580 
00581 // called whenever some solver proved unsat
00582 bool ParallelSolve::commitUnsat(Solver& s) {
00583         if (!enumerator().optimize() || shared_->terminate() || shared_->synchronize()) {
00584                 return false; 
00585         }
00586         if (!thread_[s.id()]->disjointPath()) {
00587                 lock_guard<mutex> lock(shared_->modelM);
00588                 if (!enumerator().commitUnsat(s)) {
00589                         terminate(s, true);
00590                         return false;
00591                 }
00592                 ++shared_->modCount;
00593                 return true;
00594         }
00595         return enumerator().commitUnsat(s);
00596 }
00597 
00598 // called whenever some solver has found a model
00599 bool ParallelSolve::commitModel(Solver& s) { 
00600         // grab lock - models must be processed sequentially
00601         // in order to simplify printing and to avoid duplicates
00602         // in all non-trivial enumeration modes
00603         bool stop = false;
00604         {lock_guard<mutex> lock(shared_->modelM);
00605         // first check if the model is still valid once all
00606         // information is integrated into the solver
00607         if (thread_[s.id()]->isModel(s) && (stop=shared_->terminate()) == false && enumerator().commitModel(s)) {
00608                 if (enumerator().lastModel().num == 1 && !enumerator().supportsRestarts()) {
00609                         // switch to backtracking based splitting algorithm
00610                         // the solver's gp will act as the root for splitting and is
00611                         // from now on disjoint from all other gps
00612                         shared_->setControl(SharedData::forbid_restart_flag | SharedData::allow_split_flag);
00613                         thread_[s.id()]->setGpType(gp_split); 
00614                         enumerator().setDisjoint(s, true);
00615                 }
00616                 ++shared_->modCount;
00617                 if ((stop = !reportModel(s)) == true) {
00618                         // must be called while holding the lock - otherwise
00619                         // we have a race condition with solvers that
00620                         // are currently blocking on the mutex and we could enumerate 
00621                         // more models than requested by the user
00622                         terminate(s, s.decisionLevel() == 0);
00623                 }
00624         }}
00625         return !stop;
00626 }
00627 
00628 uint64 ParallelSolve::hasErrors() const {
00629         return shared_->error;
00630 }
00631 bool ParallelSolve::interrupted() const {
00632         return shared_->interrupt();
00633 }
00634 void ParallelSolve::resetSolve() {
00635         shared_->control = 0;
00636 }
00637 void ParallelSolve::enableInterrupts() {}
00638 // updates s with new messages and uses s to create a new guiding path
00639 // if necessary and possible
00640 bool ParallelSolve::handleMessages(Solver& s) {
00641         // check if there are new messages for s
00642         if (!shared_->hasMessage()) {
00643                 // nothing to do
00644                 return true; 
00645         }
00646         ParallelHandler* h = thread_[s.id()];
00647         if (shared_->terminate()) {
00648                 reportProgress(MessageEvent(s, "TERMINATE", MessageEvent::received));
00649                 h->handleTerminateMessage();
00650                 s.setStopConflict();
00651                 return false;
00652         }
00653         if (shared_->synchronize()) {
00654                 reportProgress(MessageEvent(s, "SYNC", MessageEvent::received));
00655                 if (waitOnSync(s)) {
00656                         s.setStopConflict();
00657                         return false;
00658                 }
00659                 return true;
00660         }
00661         if (h->disjointPath() && s.splittable() && shared_->workReq > 0) {
00662                 // First declare split request as handled
00663                 // and only then do the actual split.
00664                 // This way, we minimize the chance for 
00665                 // "over"-splitting, i.e. one split request handled
00666                 // by more than one thread.
00667                 shared_->aboutToSplit();
00668                 reportProgress(MessageEvent(s, "SPLIT", MessageEvent::received));
00669                 h->handleSplitMessage();
00670                 enumerator().setDisjoint(s, true);
00671         }
00672         return true;
00673 }
00674 
00675 bool ParallelSolve::integrateModels(Solver& s, uint32& upCount) {
00676         uint32 gCount = shared_->modCount;
00677         return gCount == upCount || (enumerator().update(s) && (upCount = gCount) == gCount);
00678 }
00679 
00680 void ParallelSolve::requestRestart() {
00681         if (shared_->allowRestart() && ++shared_->restartReq == numThreads()) {
00682                 shared_->postMessage(SharedData::msg_sync_restart, true);
00683         }
00684 }
00685 
00686 SolveAlgorithm* ParallelSolveOptions::createSolveObject() const {
00687         return numSolver() > 1 ? new ParallelSolve(0, *this) : BasicSolveOptions::createSolveObject();
00688 }
00690 // ParallelHandler
00692 ParallelHandler::ParallelHandler(ParallelSolve& ctrl, Solver& s, const SolveParams& p) 
00693         : MessageHandler()
00694         , ctrl_(&ctrl)
00695         , solver_(&s)
00696         , params_(&p)
00697         , received_(0)
00698         , recEnd_(0)
00699         , intEnd_(0)
00700         , error_(0)
00701         , win_(0)
00702         , up_(0) {
00703         this->next = this;
00704 }
00705 
00706 ParallelHandler::~ParallelHandler() { clearDB(0); delete [] received_; }
00707 
00708 // adds this as post propagator to its solver and attaches the solver to ctx.
00709 bool ParallelHandler::attach(SharedContext& ctx) {
00710         assert(solver_ && params_);
00711         gp_.reset();
00712         error_  = 0;
00713         win_    = 0;
00714         up_     = 0;
00715         act_    = 0;
00716         next    = 0;
00717         if (!received_ && ctx.distributor.get()) {
00718                 received_ = new SharedLiterals*[RECEIVE_BUFFER_SIZE];
00719         }
00720         ctx.report(message(Event::subsystem_solve, "attach", solver_));
00721         solver_->addPost(this);
00722         return ctx.attach(solver_->id());
00723 }
00724 
00725 // removes this from the list of post propagators of its solver and detaches the solver from ctx.
00726 void ParallelHandler::detach(SharedContext& ctx, bool) {
00727         handleTerminateMessage();
00728         if (solver_->sharedContext() == &ctx) {
00729                 clearDB(!error() ? solver_ : 0);
00730                 ctx.detach(*solver_, error() != 0);
00731         }
00732         ctx.report(message(Event::subsystem_solve, "detach", solver_));
00733 }
00734 
00735 void ParallelHandler::clearDB(Solver* s) {
00736         for (ClauseDB::iterator it = integrated_.begin(), end = integrated_.end(); it != end; ++it) {
00737                 ClauseHead* c = static_cast<ClauseHead*>(*it);
00738                 if (s) { s->addLearnt(c, c->size(), Constraint_t::learnt_other); }
00739                 else   { c->destroy(); }
00740         }
00741         integrated_.clear();
00742         intEnd_= 0;
00743         for (uint32 i = 0; i != recEnd_; ++i) { received_[i]->release(); }
00744         recEnd_= 0;
00745 }
00746 
00747 ValueRep ParallelHandler::solveGP(BasicSolve& solve, GpType t, uint64 restart) {
00748         ValueRep res = value_free;
00749         bool     term= false;
00750         Solver&  s   = solve.solver();
00751         gp_.reset(restart, t);
00752         assert(act_ == 0);
00753         do {
00754                 ctrl_->integrateModels(s, gp_.modCount);
00755                 up_ = act_ = 1; // activate enumerator and bounds
00756                 res = solve.solve();
00757                 up_ = act_ = 0; // de-activate enumerator and bounds
00758                 if      (res == value_true)  { term = !ctrl_->commitModel(s); }
00759                 else if (res == value_false) { term = !ctrl_->commitUnsat(s); solve.reset(term); gp_.reset(restart, gp_.type); }
00760         } while (!term && res != value_free);
00761         return res;
00762 }
00763 
00764 // detach from solver, i.e. ignore any further messages 
00765 void ParallelHandler::handleTerminateMessage() {
00766         if (this->next != this) {
00767                 // mark removed propagator by creating "self-loop"
00768                 solver_->removePost(this);
00769                 this->next = this;
00770         }
00771 }
00772 
00773 // split-off new guiding path and add it to solve object
00774 void ParallelHandler::handleSplitMessage() {
00775         assert(solver_ && "ParallelHandler::handleSplitMessage(): not attached!");
00776         assert(solver_->splittable());
00777         Solver& s = *solver_;
00778         SingleOwnerPtr<LitVec> newPath(new LitVec());
00779         s.split(*newPath);
00780         ctrl_->pushWork(newPath.release());
00781 }
00782 
00783 bool ParallelHandler::handleRestartMessage() {
00784         // TODO
00785         // we may want to implement some heuristic, like
00786         // computing a local var order. 
00787         return true;
00788 }
00789 
00790 bool ParallelHandler::simplify(Solver& s, bool sh) {
00791         ClauseDB::size_type i, j, end = integrated_.size();
00792         for (i = j = 0; i != end; ++i) {
00793                 Constraint* c = integrated_[i];
00794                 if (c->simplify(s, sh)) { 
00795                         c->destroy(&s, false); 
00796                         intEnd_ -= (i < intEnd_);
00797                 }
00798                 else                    { 
00799                         integrated_[j++] = c;  
00800                 }
00801         }
00802         shrinkVecTo(integrated_, j);
00803         if (intEnd_ > integrated_.size()) intEnd_ = integrated_.size();
00804         return false;
00805 }
00806 
00807 bool ParallelHandler::propagateFixpoint(Solver& s, PostPropagator* ctx) {
00808         // Check for messages and integrate any new information from
00809         // models/lemma exchange but only if path is setup.
00810         // Skip updates if called from other post propagator so that we do not
00811         // disturb any active propagation.
00812         if (int up = (ctx == 0 && up_ != 0)) {
00813                 up_ ^= s.strategy.upMode;
00814                 up  += (act_ == 0 || (up_ && (s.stats.choices & 63) != 0));
00815                 if (s.stats.conflicts >= gp_.restart)  { ctrl_->requestRestart(); gp_.restart *= 2; }
00816                 for (uint32 cDL = s.decisionLevel();;) {
00817                         bool ok = ctrl_->handleMessages(s) && (up > 1 ? integrate(s) : ctrl_->integrateModels(s, gp_.modCount));
00818                         if (!ok)                         { return false; }
00819                         if (cDL != s.decisionLevel())    { // cancel active propagation on cDL
00820                                 for (PostPropagator* n = next; n ; n = n->next){ n->reset(); }
00821                                 cDL = s.decisionLevel();
00822                         }       
00823                         if      (!s.queueSize())         { if (++up == 3) return true; }
00824                         else if (!s.propagateUntil(this)){ return false; }
00825                 }
00826         }
00827         return ctrl_->handleMessages(s);
00828 }
00829 
00830 // checks whether s still has a model once all 
00831 // information from previously found models were integrated 
00832 bool ParallelHandler::isModel(Solver& s) {
00833         assert(s.numFreeVars() == 0);
00834         // either no unprocessed updates or still a model after
00835         // updates were integrated
00836         return ctrl_->integrateModels(s, gp_.modCount)
00837                 && s.numFreeVars() == 0
00838                 && s.queueSize()   == 0;
00839 }
00840 bool ParallelHandler::integrate(Solver& s) {
00841         uint32 rec = recEnd_ + s.receive(received_ + recEnd_, RECEIVE_BUFFER_SIZE - recEnd_);
00842         if (!rec) { return true; }
00843         ClauseCreator::Result ret;
00844         uint32 dl       = s.decisionLevel(), added = 0, i = 0;
00845         uint32 intFlags = ctrl_->integrateFlags();
00846         recEnd_         = 0;
00847         if (s.strategy.updateLbd || params_->reduce.strategy.glue != 0) {
00848                 intFlags |= ClauseCreator::clause_int_lbd;
00849         }
00850         do {
00851                 ret    = ClauseCreator::integrate(s, received_[i++], intFlags, Constraint_t::learnt_other);
00852                 added += ret.status != ClauseCreator::status_subsumed; 
00853                 if (ret.local) { add(ret.local); }
00854                 if (ret.unit()){ s.stats.addIntegratedAsserting(dl, s.decisionLevel()); dl = s.decisionLevel(); }
00855                 if (!ret.ok()) { while (i != rec) { received_[recEnd_++] = received_[i++]; } }
00856         } while (i != rec);
00857         s.stats.addIntegrated(added);
00858         return !s.hasConflict();
00859 }
00860 
00861 void ParallelHandler::add(ClauseHead* h) {
00862         if (intEnd_ < integrated_.size()) {
00863                 ClauseHead* o = (ClauseHead*)integrated_[intEnd_];
00864                 integrated_[intEnd_] = h;
00865                 assert(o);
00866                 if (!ctrl_->integrateUseHeuristic() || o->locked(*solver_) || o->activity().activity() > 0) {
00867                         solver_->addLearnt(o, o->size(), Constraint_t::learnt_other);
00868                 }
00869                 else {
00870                         o->destroy(solver_, true);
00871                         solver_->stats.removeIntegrated();
00872                 }
00873         }
00874         else {
00875                 integrated_.push_back(h);
00876         }
00877         if (++intEnd_ >= ctrl_->integrateGrace()) {
00878                 intEnd_ = 0;
00879         }
00880 }
00882 // GlobalQueue
00884 GlobalQueue::GlobalQueue(const Policy& p, uint32 maxT, uint32 topo) : Distributor(p), queue_(0) {
00885         assert(maxT <= ParallelSolveOptions::supportedSolvers());
00886         queue_     = new Queue(maxT);
00887         threadId_  = new ThreadInfo[maxT];
00888         for (uint32 i = 0; i != maxT; ++i) {
00889                 threadId_[i].id       = queue_->addThread();
00890                 threadId_[i].peerMask = populatePeerMask(topo, maxT, i);
00891         }
00892 }
00893 GlobalQueue::~GlobalQueue() {
00894         release();
00895 }
00896 void GlobalQueue::release() {
00897         if (queue_) {
00898                 for (uint32 i = 0; i != queue_->maxThreads(); ++i) {
00899                         Queue::ThreadId& id = getThreadId(i);
00900                         for (DistPair n; queue_->tryConsume(id, n); ) { 
00901                                 if (n.sender != i) { n.lits->release(); }
00902                         }
00903                 }
00904                 delete queue_;
00905                 queue_ = 0;
00906                 delete [] threadId_;
00907         }
00908 }
00909 uint64 GlobalQueue::populateFromCube(uint32 numThreads, uint32 myId, bool ext) const {
00910         uint32 n = numThreads;
00911         uint32 k = 1;
00912         for (uint32 i = n / 2; i > 0; i /= 2, k *= 2) { }
00913         uint64 res = 0, x = 1;
00914         for (uint32 m = 1; m <= k; m *= 2) {
00915                 uint32 i = m ^ myId;
00916                 if      (i < n)         { res |= (x << i);     }
00917                 else if (ext && k != m) { res |= (x << (i^k)); }
00918         }
00919         if (ext) {
00920                 uint32 s = k ^ myId;
00921                 for(uint32 m = 1; m < k && s >= n; m *= 2) {
00922                         uint32 i = m ^ s;
00923                         if (i < n) { res |= (x << i); }
00924                 }
00925         }
00926         assert( (res & (x<<myId)) == 0 );
00927         return res;
00928 }
00929 uint64 GlobalQueue::populatePeerMask(uint32 topo, uint32 maxT, uint32 id) const {
00930         switch (topo) {
00931                 case ParallelSolveOptions::Integration::topo_ring: {
00932                         uint32 prev = id > 0 ? id - 1 : maxT - 1;
00933                         uint32 next = (id + 1) % maxT;
00934                         return Distributor::mask(prev) | Distributor::mask(next);
00935                 }
00936                 case ParallelSolveOptions::Integration::topo_cube:  return populateFromCube(maxT, id, false);
00937                 case ParallelSolveOptions::Integration::topo_cubex: return populateFromCube(maxT, id, true);
00938                 default:                                     return Distributor::initSet(maxT) ^ Distributor::mask(id);
00939         }
00940 }
00941 
00942 void GlobalQueue::publish(const Solver& s, SharedLiterals* n) {
00943         assert(n->refCount() >= (queue_->maxThreads()-1));
00944         queue_->publish(DistPair(s.id(), n), getThreadId(s.id()));
00945 }
00946 uint32 GlobalQueue::receive(const Solver& in, SharedLiterals** out, uint32 maxn) {
00947         uint32 r = 0;
00948         Queue::ThreadId& id = getThreadId(in.id());
00949         uint64 peers = getPeerMask(in.id());
00950         for (DistPair n; r != maxn && queue_->tryConsume(id, n); ) {
00951                 if (n.sender != in.id()) {
00952                         if (inSet(peers, n.sender))  { out[r++] = n.lits; }
00953                         else if (n.lits->size() == 1){ out[r++] = n.lits; }
00954                         else                         { n.lits->release(); }
00955                 }
00956         }
00957         return r;
00958 }
00959 } } // namespace Clasp::mt
00960 
00961 #endif


clasp
Author(s): Benjamin Kaufmann
autogenerated on Thu Aug 27 2015 12:41:39