00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #if WITH_THREADS
00021 #include <clasp/parallel_solve.h>
00022 #include <clasp/solver.h>
00023 #include <clasp/clause.h>
00024 #include <clasp/enumerator.h>
00025 #include <clasp/util/timer.h>
00026 #include <clasp/minimize_constraint.h>
00027 #include <clasp/util/mutex.h>
00028 #include <tbb/concurrent_queue.h>
00029 namespace Clasp { namespace mt {
00031
00033
00034 class BarrierSemaphore {
00035 public:
00036 explicit BarrierSemaphore(int counter = 0, int maxParties = 1) : counter_(counter), active_(maxParties) {}
00037
00038
00039
00040
00041 void unsafe_init(int counter = 0, int maxParties = 1) {
00042 counter_ = counter;
00043 active_ = maxParties;
00044 }
00045
00046 int counter() { lock_guard<mutex> lock(semMutex_); return counter_; }
00047
00048 int parties() { lock_guard<mutex> lock(semMutex_); return active_; }
00049
00050 bool active() { lock_guard<mutex> lock(semMutex_); return unsafe_active(); }
00051
00052
00053
00054
00055
00056 void addParty() {
00057 lock_guard<mutex> lock(semMutex_);
00058 ++active_;
00059 }
00060
00061
00062
00063 void removeParty(bool reset) {
00064 unique_lock<mutex> lock(semMutex_);
00065 assert(active_ > 0);
00066 --active_;
00067 if (reset) { unsafe_reset(0); }
00068 else if (unsafe_active()) { counter_ = -active_; lock.unlock(); semCond_.notify_one(); }
00069 }
00070
00071
00072
00073
00074
00075 bool wait() {
00076 unique_lock<mutex> lock(semMutex_);
00077 if (--counter_ >= 0) { counter_ = -1; }
00078 return unsafe_wait(lock);
00079 }
00080
00081 void reset(uint32 semCount = 0) {
00082 lock_guard<mutex> lock(semMutex_);
00083 unsafe_reset(semCount);
00084 }
00085
00086
00087
00088
00089
00090
00091
00092 bool down() {
00093 unique_lock<mutex> lock(semMutex_);
00094 if (--counter_ >= 0) { return true; }
00095 return !unsafe_wait(lock);
00096 }
00097
00098
00099
00100 void up() {
00101 bool notify;
00102 {
00103 lock_guard<mutex> lock(semMutex_);
00104 notify = (++counter_ < 1);
00105 }
00106 if (notify) semCond_.notify_one();
00107 }
00108 private:
00109 BarrierSemaphore(const BarrierSemaphore&);
00110 BarrierSemaphore& operator=(const BarrierSemaphore&);
00111 typedef condition_variable cv;
00112 bool unsafe_active() const { return -counter_ >= active_; }
00113 void unsafe_reset(uint32 semCount) {
00114 int prev = counter_;
00115 counter_ = semCount;
00116 if (prev < 0) { semCond_.notify_all(); }
00117 }
00118
00119 bool unsafe_wait(unique_lock<mutex>& lock) {
00120 assert(counter_ < 0);
00121
00122 if (!unsafe_active()) {
00123 semCond_.wait(lock);
00124 }
00125 return unsafe_active();
00126 }
00127 cv semCond_;
00128 mutex semMutex_;
00129 int counter_;
00130 int active_;
00131 };
00133
00135 struct ParallelSolve::SharedData {
00136 typedef tbb::concurrent_queue<const LitVec*> queue;
00137 enum MsgFlag {
00138 terminate_flag = 1u, sync_flag = 2u, split_flag = 4u,
00139 restart_flag = 8u, complete_flag = 16u,
00140 interrupt_flag = 32u,
00141 allow_split_flag = 64u,
00142 forbid_restart_flag = 128u,
00143 cancel_restart_flag = 256u,
00144 restart_abandoned_flag= 512u,
00145 };
00146 enum Message {
00147 msg_terminate = (terminate_flag),
00148 msg_interrupt = (terminate_flag | interrupt_flag),
00149 msg_sync_restart = (sync_flag | restart_flag),
00150 msg_split = split_flag
00151 };
00152 SharedData() : path(0) { reset(0); control = 0; }
00153 void reset(SharedContext* a_ctx) {
00154 clearQueue();
00155 syncT.reset();
00156 workSem.unsafe_init(0, a_ctx ? a_ctx->concurrency() : 0);
00157 globalR.reset();
00158 maxConflict = globalR.current();
00159 error = 0;
00160 initMask = 0;
00161 ctx = a_ctx;
00162 path = 0;
00163 nextId = 1;
00164 workReq = 0;
00165 restartReq = 0;
00166 }
00167 void clearQueue() {
00168 for (const LitVec* a = 0; workQ.try_pop(a); ) {
00169 if (a != path) { delete a; }
00170 }
00171 }
00172 const LitVec* requestWork(uint32 id) {
00173
00174 uint64 m(uint64(1) << id);
00175 uint64 init = initMask;
00176 if ((m & init) == m) { initMask -= m; return path; }
00177
00178 const LitVec* res = 0;
00179 if (workQ.try_pop(res)) { return res; }
00180 return 0;
00181 }
00182
00183 bool hasMessage() const { return (control & uint32(7)) != 0; }
00184 bool synchronize() const { return (control & uint32(sync_flag)) != 0; }
00185 bool terminate() const { return (control & uint32(terminate_flag)) != 0; }
00186 bool split() const { return (control & uint32(split_flag)) != 0; }
00187 bool postMessage(Message m, bool notify);
00188 void aboutToSplit() { if (--workReq == 0) updateSplitFlag(); }
00189 void updateSplitFlag();
00190
00191 bool hasControl(uint32 f) const { return (control & f) != 0; }
00192 bool interrupt() const { return hasControl(interrupt_flag);}
00193 bool complete() const { return hasControl(complete_flag); }
00194 bool restart() const { return hasControl(restart_flag); }
00195 bool allowSplit() const { return hasControl(allow_split_flag); }
00196 bool allowRestart() const { return !hasControl(forbid_restart_flag); }
00197 bool setControl(uint32 flags) { return (fetch_and_or(control, flags) & flags) != flags; }
00198 bool clearControl(uint32 flags) { return (fetch_and_and(control, ~flags) & flags) == flags; }
00199 ScheduleStrategy globalR;
00200 uint64 maxConflict;
00201 uint64 error;
00202 SharedContext* ctx;
00203 const LitVec* path;
00204 atomic<uint64> initMask;
00205 Timer<RealTime> syncT;
00206 mutex modelM;
00207 BarrierSemaphore workSem;
00208 queue workQ;
00209 uint32 nextId;
00210 atomic<int> workReq;
00211 atomic<uint32> restartReq;
00212 atomic<uint32> control;
00213 atomic<uint32> modCount;
00214 };
00215
00216
00217 bool ParallelSolve::SharedData::postMessage(Message m, bool notifyWaiting) {
00218 if (m == msg_split) {
00219 if (++workReq == 1) { updateSplitFlag(); }
00220 return true;
00221 }
00222 else if (setControl(m)) {
00223
00224 if (notifyWaiting) workSem.reset();
00225 if ((uint32(m) & uint32(sync_flag|terminate_flag)) != 0) {
00226 syncT.reset();
00227 syncT.start();
00228 }
00229 return true;
00230 }
00231 return false;
00232 }
00233
00234 void ParallelSolve::SharedData::updateSplitFlag() {
00235 for (bool splitF;;) {
00236 splitF = (workReq > 0);
00237 if (split() == splitF) return;
00238 if (splitF) fetch_and_or(control, uint32(split_flag));
00239 else fetch_and_and(control, ~uint32(split_flag));
00240 }
00241 }
00243
00245 ParallelSolve::ParallelSolve(Enumerator* e, const ParallelSolveOptions& opts)
00246 : SolveAlgorithm(e, opts.limit)
00247 , shared_(new SharedData)
00248 , thread_(0)
00249 , distribution_(opts.distribute)
00250 , maxRestarts_(0)
00251 , intGrace_(1024)
00252 , intTopo_(opts.integrate.topo)
00253 , intFlags_(ClauseCreator::clause_not_root_sat | ClauseCreator::clause_no_add)
00254 , initialGp_(opts.algorithm.mode == ParallelSolveOptions::Algorithm::mode_split ? gp_split : gp_fixed) {
00255 setRestarts(opts.restarts.maxR, opts.restarts.sched);
00256 setIntegrate(opts.integrate.grace, opts.integrate.filter);
00257 }
00258
00259 ParallelSolve::~ParallelSolve() {
00260 if (shared_->nextId > 1) {
00261
00262
00263 ParallelSolve::doInterrupt();
00264 shared_->workSem.removeParty(true);
00265 joinThreads();
00266 }
00267 destroyThread(masterId);
00268 delete shared_;
00269 }
00270
00271 bool ParallelSolve::beginSolve(SharedContext& ctx) {
00272 assert(ctx.concurrency() && "Illegal number of threads");
00273 if (shared_->terminate()) { return false; }
00274 shared_->reset(&ctx);
00275 shared_->setControl(initialGp_ == gp_split ? SharedData::allow_split_flag : SharedData::forbid_restart_flag);
00276 shared_->setControl(SharedData::sync_flag);
00277 shared_->modCount = uint32(enumerator().optimize());
00278 if (distribution_.types != 0 && ctx.distributor.get() == 0) {
00279 ctx.distributor.reset(new mt::GlobalQueue(distribution_, ctx.concurrency(), intTopo_));
00280 }
00281 reportProgress(MessageEvent(*ctx.master(), "SYNC", MessageEvent::sent));
00282 for (uint32 i = 1; i != ctx.concurrency(); ++i) {
00283 uint32 id = shared_->nextId++;
00284 allocThread(id, *ctx.solver(id), ctx.configuration()->search(id));
00285
00286 Clasp::thread x(std::mem_fun(&ParallelSolve::solveParallel), this, id);
00287 thread_[id]->setThread(x);
00288 }
00289 return true;
00290 }
00291
00292 void ParallelSolve::setIntegrate(uint32 grace, uint8 filter) {
00293 typedef ParallelSolveOptions::Integration Dist;
00294 intGrace_ = grace;
00295 intFlags_ = ClauseCreator::clause_no_add;
00296 if (filter == Dist::filter_heuristic) { store_set_bit(intFlags_, 31); }
00297 if (filter != Dist::filter_no) { intFlags_ |= ClauseCreator::clause_not_root_sat; }
00298 if (filter == Dist::filter_sat) { intFlags_ |= ClauseCreator::clause_not_sat; }
00299 }
00300
00301 void ParallelSolve::setRestarts(uint32 maxR, const ScheduleStrategy& rs) {
00302 maxRestarts_ = maxR;
00303 shared_->globalR = maxR ? rs : ScheduleStrategy::none();
00304 shared_->maxConflict = shared_->globalR.current();
00305 }
00306
00307 uint32 ParallelSolve::numThreads() const { return shared_->workSem.parties(); }
00308
00309 void ParallelSolve::allocThread(uint32 id, Solver& s, const SolveParams& p) {
00310 if (!thread_) {
00311 uint32 n = numThreads();
00312 thread_ = new ParallelHandler*[n];
00313 std::fill(thread_, thread_+n, static_cast<ParallelHandler*>(0));
00314 }
00315 CLASP_PRAGMA_TODO("replace with CACHE_LINE_ALIGNED alloc")
00316 uint32 b = ((sizeof(ParallelHandler)+63) / 64) * 64;
00317 thread_[id]= new (::operator new( b )) ParallelHandler(*this, s, p);
00318 }
00319
00320 void ParallelSolve::destroyThread(uint32 id) {
00321 if (thread_ && thread_[id]) {
00322 assert(!thread_[id]->joinable() && "Shutdown not completed!");
00323 thread_[id]->~ParallelHandler();
00324 ::operator delete(thread_[id]);
00325 thread_[id] = 0;
00326 if (id == masterId) {
00327 delete [] thread_;
00328 thread_ = 0;
00329 }
00330 }
00331 }
00332
00333 inline void ParallelSolve::reportProgress(const Event& ev) const {
00334 return shared_->ctx->report(ev);
00335 }
00336
00337
00338 void ParallelSolve::joinThreads() {
00339 int ec = thread_[masterId]->error();
00340 uint32 winner = thread_[masterId]->winner() ? uint32(masterId) : UINT32_MAX;
00341 shared_->error= ec ? 1 : 0;
00342 for (uint32 i = 1, end = shared_->nextId; i != end; ++i) {
00343 if (thread_[i]->join() != 0) {
00344 shared_->error |= uint64(1) << i;
00345 ec = std::max(ec, thread_[i]->error());
00346 }
00347 if (thread_[i]->winner() && i < winner) {
00348 winner = i;
00349 }
00350 destroyThread(i);
00351 }
00352
00353 thread_[masterId]->detach(*shared_->ctx, shared_->interrupt());
00354 thread_[masterId]->setError(!shared_->interrupt() ? thread_[masterId]->error() : ec);
00355 shared_->ctx->setWinner(winner);
00356 shared_->nextId = 1;
00357 shared_->syncT.stop();
00358 reportProgress(MessageEvent(*shared_->ctx->master(), "TERMINATE", MessageEvent::completed, shared_->syncT.total()));
00359 }
00360
00361
00362 bool ParallelSolve::doSolve(SharedContext& ctx, const LitVec& path) {
00363 if (beginSolve(ctx)) {
00364 Solver& s = *ctx.master();
00365 assert(s.id() == masterId);
00366 allocThread(masterId, s, ctx.configuration()->search(masterId));
00367 shared_->path = &path;
00368 shared_->syncT.start();
00369 solveParallel(masterId);
00370 reportProgress(message(Event::subsystem_solve, "Joining with other threads", &s));
00371 joinThreads();
00372 int err = thread_[masterId]->error();
00373 destroyThread(masterId);
00374 shared_->ctx->distributor.reset(0);
00375 switch(err) {
00376 case error_none : break;
00377 case error_oom : throw std::bad_alloc();
00378 case error_runtime: throw std::runtime_error("RUNTIME ERROR!");
00379 default: throw std::runtime_error("UNKNOWN ERROR!");
00380 }
00381 }
00382 return !shared_->complete();
00383 }
00384
00385
00386 void ParallelSolve::solveParallel(uint32 id) {
00387 Solver& s = thread_[id]->solver();
00388 const SolveParams& p= thread_[id]->params();
00389 SolveLimits lim = limits();
00390 SolverStats agg;
00391 PathPtr a(0);
00392 try {
00393
00394
00395
00396 thread_[id]->attach(*shared_->ctx);
00397 BasicSolve solve(s, p, &lim);
00398 agg.enableStats(s.stats);
00399 for (GpType t; requestWork(s, a); solve.reset()) {
00400 agg.accu(s.stats);
00401 s.stats.reset();
00402 thread_[id]->setGpType(t = a.is_owner() ? gp_split : initialGp_);
00403 if (enumerator().start(s, *a, a.is_owner()) && thread_[id]->solveGP(solve, t, shared_->maxConflict) == value_free) {
00404 terminate(s, false);
00405 }
00406 s.clearStopConflict();
00407 enumerator().end(s);
00408 }
00409 }
00410 catch (const std::bad_alloc&) { exception(id,a,error_oom, "ERROR: std::bad_alloc" ); }
00411 catch (const std::exception& e){ exception(id,a,error_runtime, e.what()); }
00412 catch (...) { exception(id,a,error_other, "ERROR: unknown"); }
00413 assert(shared_->terminate() || thread_[id]->error() != error_none);
00414
00415 shared_->workSem.removeParty(shared_->terminate());
00416
00417 s.stats.accu(agg);
00418 if (id != masterId) {
00419
00420
00421
00422 thread_[id]->detach(*shared_->ctx, shared_->interrupt());
00423 s.stats.addCpuTime(ThreadTime::getTime());
00424 }
00425 }
00426
00427 void ParallelSolve::exception(uint32 id, PathPtr& path, ErrorCode e, const char* what) {
00428 try {
00429 reportProgress(message(Event::subsystem_solve, what, &thread_[id]->solver()));
00430 thread_[id]->setError(e);
00431 if (id == masterId || shared_->workSem.active()) {
00432 ParallelSolve::doInterrupt();
00433 return;
00434 }
00435 else if (path.get() && shared_->allowSplit()) {
00436 shared_->workQ.push(path.release());
00437 shared_->workSem.up();
00438 }
00439 reportProgress(warning(Event::subsystem_solve, "Thread failed and was removed.", &thread_[id]->solver()));
00440 }
00441 catch(...) { ParallelSolve::doInterrupt(); }
00442 }
00443
00444
00445 bool ParallelSolve::doInterrupt() {
00446
00447
00448 shared_->postMessage(SharedData::msg_interrupt, false);
00449 return true;
00450 }
00451
00452
00453 bool ParallelSolve::requestWork(Solver& s, PathPtr& out) {
00454 const LitVec* a = 0;
00455 while (!shared_->terminate()) {
00456
00457
00458 if (!s.popRootLevel(s.rootLevel())) {
00459
00460 terminate(s, true);
00461 }
00462 else if (shared_->synchronize()) {
00463
00464
00465 waitOnSync(s);
00466 }
00467 else if (a || (a = shared_->requestWork(s.id())) != 0) {
00468 assert(s.decisionLevel() == 0);
00469
00470 out = a;
00471
00472 if (a == shared_->path) { out.release(); }
00473
00474 if (s.simplify()) { return true; }
00475
00476
00477
00478 }
00479 else if (shared_->allowSplit()) {
00480
00481
00482 reportProgress(MessageEvent(s, "SPLIT", MessageEvent::sent));
00483 shared_->postMessage(SharedData::msg_split, false);
00484 if (!shared_->workSem.down() && !shared_->synchronize()) {
00485
00486
00487 terminate(s, true);
00488 }
00489 }
00490 else {
00491
00492
00493 terminate(s, true);
00494 }
00495 }
00496 return false;
00497 }
00498
00499
00500
00501 void ParallelSolve::terminate(Solver& s, bool complete) {
00502 if (!shared_->terminate()) {
00503 if (enumerator().tentative() && complete) {
00504 if (shared_->setControl(SharedData::sync_flag|SharedData::complete_flag)) {
00505 thread_[s.id()]->setWinner();
00506 reportProgress(MessageEvent(s, "SYNC", MessageEvent::sent));
00507 }
00508 }
00509 else {
00510 reportProgress(MessageEvent(s, "TERMINATE", MessageEvent::sent));
00511 shared_->postMessage(SharedData::msg_terminate, true);
00512 thread_[s.id()]->setWinner();
00513 if (complete) { shared_->setControl(SharedData::complete_flag); }
00514 }
00515 }
00516 }
00517
00518
00519
00520 bool ParallelSolve::waitOnSync(Solver& s) {
00521 if (!thread_[s.id()]->handleRestartMessage()) {
00522 shared_->setControl(SharedData::cancel_restart_flag);
00523 }
00524 bool hasPath = thread_[s.id()]->hasPath();
00525 bool tentative= enumerator().tentative();
00526 if (shared_->workSem.wait()) {
00527
00528 shared_->workReq = 0;
00529 shared_->restartReq = 0;
00530 bool restart = shared_->hasControl(SharedData::restart_flag);
00531 bool init = true;
00532 if (restart) {
00533 init = shared_->allowRestart() && !shared_->hasControl(SharedData::cancel_restart_flag);
00534 if (init) { shared_->globalR.next(); }
00535 shared_->maxConflict = shared_->allowRestart() && shared_->globalR.idx < maxRestarts_
00536 ? shared_->globalR.current()
00537 : UINT64_MAX;
00538 }
00539 else if (shared_->maxConflict != UINT64_MAX && !shared_->allowRestart()) {
00540 shared_->maxConflict = UINT64_MAX;
00541 }
00542 if (init) { initQueue(); }
00543 else { shared_->setControl(SharedData::restart_abandoned_flag); }
00544 if (tentative && shared_->complete()) {
00545 if (enumerator().commitComplete()) { shared_->setControl(SharedData::terminate_flag); }
00546 else { shared_->modCount = uint32(0); shared_->clearControl(SharedData::complete_flag); }
00547 }
00548 shared_->clearControl(SharedData::msg_split | SharedData::msg_sync_restart | SharedData::restart_abandoned_flag | SharedData::cancel_restart_flag);
00549 shared_->syncT.lap();
00550 reportProgress(MessageEvent(s, "SYNC", MessageEvent::completed, shared_->syncT.elapsed()));
00551
00552 shared_->workSem.reset();
00553 }
00554 return shared_->terminate() || (hasPath && !shared_->hasControl(SharedData::restart_abandoned_flag));
00555 }
00556
00557
00558
00559
00560
00561
00562
00563 void ParallelSolve::initQueue() {
00564 shared_->clearQueue();
00565 uint64 init = UINT64_MAX;
00566 if (shared_->allowSplit()) {
00567 init = 0;
00568 shared_->workQ.push(shared_->path);
00569 }
00570 shared_->initMask = init;
00571 assert(shared_->allowSplit() || shared_->hasControl(SharedData::forbid_restart_flag));
00572 }
00573
00574
00575 void ParallelSolve::pushWork(LitVec* v) {
00576 assert(v);
00577 shared_->workQ.push(v);
00578 shared_->workSem.up();
00579 }
00580
00581
00582 bool ParallelSolve::commitUnsat(Solver& s) {
00583 if (!enumerator().optimize() || shared_->terminate() || shared_->synchronize()) {
00584 return false;
00585 }
00586 if (!thread_[s.id()]->disjointPath()) {
00587 lock_guard<mutex> lock(shared_->modelM);
00588 if (!enumerator().commitUnsat(s)) {
00589 terminate(s, true);
00590 return false;
00591 }
00592 ++shared_->modCount;
00593 return true;
00594 }
00595 return enumerator().commitUnsat(s);
00596 }
00597
00598
00599 bool ParallelSolve::commitModel(Solver& s) {
00600
00601
00602
00603 bool stop = false;
00604 {lock_guard<mutex> lock(shared_->modelM);
00605
00606
00607 if (thread_[s.id()]->isModel(s) && (stop=shared_->terminate()) == false && enumerator().commitModel(s)) {
00608 if (enumerator().lastModel().num == 1 && !enumerator().supportsRestarts()) {
00609
00610
00611
00612 shared_->setControl(SharedData::forbid_restart_flag | SharedData::allow_split_flag);
00613 thread_[s.id()]->setGpType(gp_split);
00614 enumerator().setDisjoint(s, true);
00615 }
00616 ++shared_->modCount;
00617 if ((stop = !reportModel(s)) == true) {
00618
00619
00620
00621
00622 terminate(s, s.decisionLevel() == 0);
00623 }
00624 }}
00625 return !stop;
00626 }
00627
00628 uint64 ParallelSolve::hasErrors() const {
00629 return shared_->error;
00630 }
00631 bool ParallelSolve::interrupted() const {
00632 return shared_->interrupt();
00633 }
00634 void ParallelSolve::resetSolve() {
00635 shared_->control = 0;
00636 }
00637 void ParallelSolve::enableInterrupts() {}
00638
00639
00640 bool ParallelSolve::handleMessages(Solver& s) {
00641
00642 if (!shared_->hasMessage()) {
00643
00644 return true;
00645 }
00646 ParallelHandler* h = thread_[s.id()];
00647 if (shared_->terminate()) {
00648 reportProgress(MessageEvent(s, "TERMINATE", MessageEvent::received));
00649 h->handleTerminateMessage();
00650 s.setStopConflict();
00651 return false;
00652 }
00653 if (shared_->synchronize()) {
00654 reportProgress(MessageEvent(s, "SYNC", MessageEvent::received));
00655 if (waitOnSync(s)) {
00656 s.setStopConflict();
00657 return false;
00658 }
00659 return true;
00660 }
00661 if (h->disjointPath() && s.splittable() && shared_->workReq > 0) {
00662
00663
00664
00665
00666
00667 shared_->aboutToSplit();
00668 reportProgress(MessageEvent(s, "SPLIT", MessageEvent::received));
00669 h->handleSplitMessage();
00670 enumerator().setDisjoint(s, true);
00671 }
00672 return true;
00673 }
00674
00675 bool ParallelSolve::integrateModels(Solver& s, uint32& upCount) {
00676 uint32 gCount = shared_->modCount;
00677 return gCount == upCount || (enumerator().update(s) && (upCount = gCount) == gCount);
00678 }
00679
00680 void ParallelSolve::requestRestart() {
00681 if (shared_->allowRestart() && ++shared_->restartReq == numThreads()) {
00682 shared_->postMessage(SharedData::msg_sync_restart, true);
00683 }
00684 }
00685
00686 SolveAlgorithm* ParallelSolveOptions::createSolveObject() const {
00687 return numSolver() > 1 ? new ParallelSolve(0, *this) : BasicSolveOptions::createSolveObject();
00688 }
00690
00692 ParallelHandler::ParallelHandler(ParallelSolve& ctrl, Solver& s, const SolveParams& p)
00693 : MessageHandler()
00694 , ctrl_(&ctrl)
00695 , solver_(&s)
00696 , params_(&p)
00697 , received_(0)
00698 , recEnd_(0)
00699 , intEnd_(0)
00700 , error_(0)
00701 , win_(0)
00702 , up_(0) {
00703 this->next = this;
00704 }
00705
00706 ParallelHandler::~ParallelHandler() { clearDB(0); delete [] received_; }
00707
00708
00709 bool ParallelHandler::attach(SharedContext& ctx) {
00710 assert(solver_ && params_);
00711 gp_.reset();
00712 error_ = 0;
00713 win_ = 0;
00714 up_ = 0;
00715 act_ = 0;
00716 next = 0;
00717 if (!received_ && ctx.distributor.get()) {
00718 received_ = new SharedLiterals*[RECEIVE_BUFFER_SIZE];
00719 }
00720 ctx.report(message(Event::subsystem_solve, "attach", solver_));
00721 solver_->addPost(this);
00722 return ctx.attach(solver_->id());
00723 }
00724
00725
00726 void ParallelHandler::detach(SharedContext& ctx, bool) {
00727 handleTerminateMessage();
00728 if (solver_->sharedContext() == &ctx) {
00729 clearDB(!error() ? solver_ : 0);
00730 ctx.detach(*solver_, error() != 0);
00731 }
00732 ctx.report(message(Event::subsystem_solve, "detach", solver_));
00733 }
00734
00735 void ParallelHandler::clearDB(Solver* s) {
00736 for (ClauseDB::iterator it = integrated_.begin(), end = integrated_.end(); it != end; ++it) {
00737 ClauseHead* c = static_cast<ClauseHead*>(*it);
00738 if (s) { s->addLearnt(c, c->size(), Constraint_t::learnt_other); }
00739 else { c->destroy(); }
00740 }
00741 integrated_.clear();
00742 intEnd_= 0;
00743 for (uint32 i = 0; i != recEnd_; ++i) { received_[i]->release(); }
00744 recEnd_= 0;
00745 }
00746
00747 ValueRep ParallelHandler::solveGP(BasicSolve& solve, GpType t, uint64 restart) {
00748 ValueRep res = value_free;
00749 bool term= false;
00750 Solver& s = solve.solver();
00751 gp_.reset(restart, t);
00752 assert(act_ == 0);
00753 do {
00754 ctrl_->integrateModels(s, gp_.modCount);
00755 up_ = act_ = 1;
00756 res = solve.solve();
00757 up_ = act_ = 0;
00758 if (res == value_true) { term = !ctrl_->commitModel(s); }
00759 else if (res == value_false) { term = !ctrl_->commitUnsat(s); solve.reset(term); gp_.reset(restart, gp_.type); }
00760 } while (!term && res != value_free);
00761 return res;
00762 }
00763
00764
00765 void ParallelHandler::handleTerminateMessage() {
00766 if (this->next != this) {
00767
00768 solver_->removePost(this);
00769 this->next = this;
00770 }
00771 }
00772
00773
00774 void ParallelHandler::handleSplitMessage() {
00775 assert(solver_ && "ParallelHandler::handleSplitMessage(): not attached!");
00776 assert(solver_->splittable());
00777 Solver& s = *solver_;
00778 SingleOwnerPtr<LitVec> newPath(new LitVec());
00779 s.split(*newPath);
00780 ctrl_->pushWork(newPath.release());
00781 }
00782
00783 bool ParallelHandler::handleRestartMessage() {
00784
00785
00786
00787 return true;
00788 }
00789
00790 bool ParallelHandler::simplify(Solver& s, bool sh) {
00791 ClauseDB::size_type i, j, end = integrated_.size();
00792 for (i = j = 0; i != end; ++i) {
00793 Constraint* c = integrated_[i];
00794 if (c->simplify(s, sh)) {
00795 c->destroy(&s, false);
00796 intEnd_ -= (i < intEnd_);
00797 }
00798 else {
00799 integrated_[j++] = c;
00800 }
00801 }
00802 shrinkVecTo(integrated_, j);
00803 if (intEnd_ > integrated_.size()) intEnd_ = integrated_.size();
00804 return false;
00805 }
00806
00807 bool ParallelHandler::propagateFixpoint(Solver& s, PostPropagator* ctx) {
00808
00809
00810
00811
00812 if (int up = (ctx == 0 && up_ != 0)) {
00813 up_ ^= s.strategy.upMode;
00814 up += (act_ == 0 || (up_ && (s.stats.choices & 63) != 0));
00815 if (s.stats.conflicts >= gp_.restart) { ctrl_->requestRestart(); gp_.restart *= 2; }
00816 for (uint32 cDL = s.decisionLevel();;) {
00817 bool ok = ctrl_->handleMessages(s) && (up > 1 ? integrate(s) : ctrl_->integrateModels(s, gp_.modCount));
00818 if (!ok) { return false; }
00819 if (cDL != s.decisionLevel()) {
00820 for (PostPropagator* n = next; n ; n = n->next){ n->reset(); }
00821 cDL = s.decisionLevel();
00822 }
00823 if (!s.queueSize()) { if (++up == 3) return true; }
00824 else if (!s.propagateUntil(this)){ return false; }
00825 }
00826 }
00827 return ctrl_->handleMessages(s);
00828 }
00829
00830
00831
00832 bool ParallelHandler::isModel(Solver& s) {
00833 assert(s.numFreeVars() == 0);
00834
00835
00836 return ctrl_->integrateModels(s, gp_.modCount)
00837 && s.numFreeVars() == 0
00838 && s.queueSize() == 0;
00839 }
00840 bool ParallelHandler::integrate(Solver& s) {
00841 uint32 rec = recEnd_ + s.receive(received_ + recEnd_, RECEIVE_BUFFER_SIZE - recEnd_);
00842 if (!rec) { return true; }
00843 ClauseCreator::Result ret;
00844 uint32 dl = s.decisionLevel(), added = 0, i = 0;
00845 uint32 intFlags = ctrl_->integrateFlags();
00846 recEnd_ = 0;
00847 if (s.strategy.updateLbd || params_->reduce.strategy.glue != 0) {
00848 intFlags |= ClauseCreator::clause_int_lbd;
00849 }
00850 do {
00851 ret = ClauseCreator::integrate(s, received_[i++], intFlags, Constraint_t::learnt_other);
00852 added += ret.status != ClauseCreator::status_subsumed;
00853 if (ret.local) { add(ret.local); }
00854 if (ret.unit()){ s.stats.addIntegratedAsserting(dl, s.decisionLevel()); dl = s.decisionLevel(); }
00855 if (!ret.ok()) { while (i != rec) { received_[recEnd_++] = received_[i++]; } }
00856 } while (i != rec);
00857 s.stats.addIntegrated(added);
00858 return !s.hasConflict();
00859 }
00860
00861 void ParallelHandler::add(ClauseHead* h) {
00862 if (intEnd_ < integrated_.size()) {
00863 ClauseHead* o = (ClauseHead*)integrated_[intEnd_];
00864 integrated_[intEnd_] = h;
00865 assert(o);
00866 if (!ctrl_->integrateUseHeuristic() || o->locked(*solver_) || o->activity().activity() > 0) {
00867 solver_->addLearnt(o, o->size(), Constraint_t::learnt_other);
00868 }
00869 else {
00870 o->destroy(solver_, true);
00871 solver_->stats.removeIntegrated();
00872 }
00873 }
00874 else {
00875 integrated_.push_back(h);
00876 }
00877 if (++intEnd_ >= ctrl_->integrateGrace()) {
00878 intEnd_ = 0;
00879 }
00880 }
00882
00884 GlobalQueue::GlobalQueue(const Policy& p, uint32 maxT, uint32 topo) : Distributor(p), queue_(0) {
00885 assert(maxT <= ParallelSolveOptions::supportedSolvers());
00886 queue_ = new Queue(maxT);
00887 threadId_ = new ThreadInfo[maxT];
00888 for (uint32 i = 0; i != maxT; ++i) {
00889 threadId_[i].id = queue_->addThread();
00890 threadId_[i].peerMask = populatePeerMask(topo, maxT, i);
00891 }
00892 }
00893 GlobalQueue::~GlobalQueue() {
00894 release();
00895 }
00896 void GlobalQueue::release() {
00897 if (queue_) {
00898 for (uint32 i = 0; i != queue_->maxThreads(); ++i) {
00899 Queue::ThreadId& id = getThreadId(i);
00900 for (DistPair n; queue_->tryConsume(id, n); ) {
00901 if (n.sender != i) { n.lits->release(); }
00902 }
00903 }
00904 delete queue_;
00905 queue_ = 0;
00906 delete [] threadId_;
00907 }
00908 }
00909 uint64 GlobalQueue::populateFromCube(uint32 numThreads, uint32 myId, bool ext) const {
00910 uint32 n = numThreads;
00911 uint32 k = 1;
00912 for (uint32 i = n / 2; i > 0; i /= 2, k *= 2) { }
00913 uint64 res = 0, x = 1;
00914 for (uint32 m = 1; m <= k; m *= 2) {
00915 uint32 i = m ^ myId;
00916 if (i < n) { res |= (x << i); }
00917 else if (ext && k != m) { res |= (x << (i^k)); }
00918 }
00919 if (ext) {
00920 uint32 s = k ^ myId;
00921 for(uint32 m = 1; m < k && s >= n; m *= 2) {
00922 uint32 i = m ^ s;
00923 if (i < n) { res |= (x << i); }
00924 }
00925 }
00926 assert( (res & (x<<myId)) == 0 );
00927 return res;
00928 }
00929 uint64 GlobalQueue::populatePeerMask(uint32 topo, uint32 maxT, uint32 id) const {
00930 switch (topo) {
00931 case ParallelSolveOptions::Integration::topo_ring: {
00932 uint32 prev = id > 0 ? id - 1 : maxT - 1;
00933 uint32 next = (id + 1) % maxT;
00934 return Distributor::mask(prev) | Distributor::mask(next);
00935 }
00936 case ParallelSolveOptions::Integration::topo_cube: return populateFromCube(maxT, id, false);
00937 case ParallelSolveOptions::Integration::topo_cubex: return populateFromCube(maxT, id, true);
00938 default: return Distributor::initSet(maxT) ^ Distributor::mask(id);
00939 }
00940 }
00941
00942 void GlobalQueue::publish(const Solver& s, SharedLiterals* n) {
00943 assert(n->refCount() >= (queue_->maxThreads()-1));
00944 queue_->publish(DistPair(s.id(), n), getThreadId(s.id()));
00945 }
00946 uint32 GlobalQueue::receive(const Solver& in, SharedLiterals** out, uint32 maxn) {
00947 uint32 r = 0;
00948 Queue::ThreadId& id = getThreadId(in.id());
00949 uint64 peers = getPeerMask(in.id());
00950 for (DistPair n; r != maxn && queue_->tryConsume(id, n); ) {
00951 if (n.sender != in.id()) {
00952 if (inSet(peers, n.sender)) { out[r++] = n.lits; }
00953 else if (n.lits->size() == 1){ out[r++] = n.lits; }
00954 else { n.lits->release(); }
00955 }
00956 }
00957 return r;
00958 }
00959 } }
00960
00961 #endif