00001
00011 #include "PO_ParallelETUCT.hh"
00012 #include <algorithm>
00013
00014 #include <sys/time.h>
00015
00016
00017 PO_ParallelETUCT::PO_ParallelETUCT(int numactions, float gamma, float rrange, float lambda,
00018 int MAX_ITER, float MAX_TIME, int MAX_DEPTH, int modelType,
00019 const std::vector<float> &fmax, const std::vector<float> &fmin,
00020 const std::vector<int> &nstatesPerDim, bool trackActual, int historySize, Random r):
00021 numactions(numactions), gamma(gamma), rrange(rrange), lambda(lambda),
00022 MAX_ITER(MAX_ITER), MAX_TIME(MAX_TIME),
00023 MAX_DEPTH(MAX_DEPTH), modelType(modelType), statesPerDim(nstatesPerDim),
00024 trackActual(trackActual), HISTORY_SIZE(historySize),
00025 HISTORY_FL_SIZE(historySize*numactions)
00026 {
00027 rng = r;
00028
00029 nstates = 0;
00030 nsaved = 0;
00031 nactions = 0;
00032 lastUpdate = -1;
00033
00034 seedMode = false;
00035 timingType = true;
00036
00037 previnfo = NULL;
00038 model = NULL;
00039 planTime = getSeconds();
00040 initTime = getSeconds();
00041 setTime = getSeconds();
00042
00043 PLANNERDEBUG = false;
00044 POLICYDEBUG = false;
00045 ACTDEBUG = false;
00046 MODELDEBUG = false;
00047 UCTDEBUG = false;
00048 PTHREADDEBUG = false;
00049 ATHREADDEBUG = false;
00050 MTHREADDEBUG = false;
00051 TIMINGDEBUG = false;
00052 REALSTATEDEBUG = false;
00053 HISTORYDEBUG = false;
00054
00055 if (statesPerDim[0] > 0){
00056 cout << "Planner Parallel ETUCT using discretization of " << statesPerDim[0] << endl;
00057 }
00058 if (trackActual){
00059 cout << "Parallel ETUCT tracking real state values" << endl;
00060 }
00061
00062 featmax = fmax;
00063 featmin = fmin;
00064
00065 pthread_mutex_init(&update_mutex, NULL);
00066 pthread_mutex_init(&nactions_mutex, NULL);
00067 pthread_mutex_init(&history_mutex, NULL);
00068 pthread_mutex_init(&plan_state_mutex, NULL);
00069 pthread_mutex_init(&statespace_mutex, NULL);
00070 pthread_mutex_init(&model_mutex, NULL);
00071 pthread_mutex_init(&list_mutex, NULL);
00072 pthread_cond_init(&list_cond, NULL);
00073
00074
00075 actualPlanState = std::vector<float>(featmax.size());
00076 discPlanState = NULL;
00077 modelThreadStarted = false;
00078 planThreadStarted = false;
00079 expList.clear();
00080
00081 if (HISTORY_SIZE == 0){
00082 saHistory.push_back(0.0);
00083 }
00084 else {
00085 if (HISTORYDEBUG) {
00086 cout << "History size of " << HISTORY_SIZE
00087 << " float size of " << HISTORY_FL_SIZE
00088 << " with state size: " << fmin.size()
00089 << " and numact: " << numactions << endl;
00090 }
00091 for (int i = 0; i < HISTORY_FL_SIZE; i++){
00092 saHistory.push_back(0.0);
00093 }
00094 }
00095
00096
00097 expfile.initFile("experiences.bin", featmax.size());
00098 }
00099
00100 PO_ParallelETUCT::~PO_ParallelETUCT() {
00101
00102
00103
00104
00105
00106 pthread_detach(planThread);
00107 pthread_detach(modelThread);
00108
00109 pthread_cancel(planThread);
00110 pthread_cancel(modelThread);
00111
00112
00113
00114
00115
00116
00117
00118
00119 pthread_mutex_lock(&plan_state_mutex);
00120 pthread_mutex_lock(&statespace_mutex);
00121 pthread_mutex_lock(&model_mutex);
00122 pthread_mutex_lock(&list_mutex);
00123 pthread_mutex_lock(&history_mutex);
00124
00125
00126 expList.clear();
00127
00128 for (std::map<state_t, state_info>::iterator i = statedata.begin();
00129 i != statedata.end(); i++){
00130
00131
00132
00133 state_info* info = &((*i).second);
00134
00135 deleteInfo(info);
00136 }
00137
00138 featmax.clear();
00139 featmin.clear();
00140
00141 statespace.clear();
00142 statedata.clear();
00143
00144 pthread_mutex_unlock(&plan_state_mutex);
00145 pthread_mutex_unlock(&statespace_mutex);
00146 pthread_mutex_unlock(&model_mutex);
00147 pthread_mutex_unlock(&list_mutex);
00148 pthread_mutex_unlock(&history_mutex);
00149
00150 }
00151
00152 void PO_ParallelETUCT::setModel(MDPModel* m){
00153
00154 model = m;
00155
00156 }
00157
00158
00160
00162
00163
00164
00165 bool PO_ParallelETUCT::updateModelWithExperience(const std::vector<float> &laststate,
00166 int lastact,
00167 const std::vector<float> &currstate,
00168 float reward, bool term){
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178 if (!timingType)
00179 planTime = getSeconds();
00180 initTime = getSeconds();
00181
00182
00183 state_t last = NULL;
00184
00185
00186 if (HISTORY_SIZE > 0){
00187 std::vector<float> modState = laststate;
00188 if (HISTORYDEBUG) {
00189 cout << "Original state vector (size " << modState.size() << ": " << modState[0];
00190 for (unsigned i = 1; i < modState.size(); i++){
00191 cout << "," << modState[i];
00192 }
00193 cout << endl;
00194 }
00195
00196 pthread_mutex_lock(&history_mutex);
00197 for (int i = 0; i < HISTORY_FL_SIZE; i++){
00198 modState.push_back(saHistory[i]);
00199 }
00200 pthread_mutex_unlock(&history_mutex);
00201
00202 if (HISTORYDEBUG) {
00203 cout << "New state vector (size " << modState.size() << ": " << modState[0];
00204 for (unsigned i = 1; i < modState.size(); i++){
00205 cout << "," << modState[i];
00206 }
00207 cout << endl;
00208 }
00209
00210 last = canonicalize(modState);
00211
00212 if (!seedMode){
00213
00214
00215
00216
00217
00218
00219
00220 pthread_mutex_lock(&history_mutex);
00221 for (int i = 0; i < numactions; i++){
00222 if (i == lastact)
00223 saHistory.push_back(1.0);
00224 else
00225 saHistory.push_back(0.0);
00226 saHistory.pop_front();
00227 }
00228 if (HISTORYDEBUG) {
00229 cout << "New history vector (size " << saHistory.size() << ": " << saHistory[0];
00230 for (unsigned i = 1; i < saHistory.size(); i++){
00231 cout << "," << saHistory[i];
00232 }
00233 cout << endl;
00234 }
00235 pthread_mutex_unlock(&history_mutex);
00236 }
00237 }
00238
00239
00240 else {
00241
00242
00243 last = canonicalize(laststate);
00244 }
00245
00246 prevstate = last;
00247 prevact = lastact;
00248
00249
00250 pthread_mutex_lock(&statespace_mutex);
00251 previnfo = &(statedata[last]);
00252 pthread_mutex_unlock(&statespace_mutex);
00253
00254 if (MODELDEBUG){
00255 cout << "Update with exp from state: ";
00256 for (unsigned i = 0; i < last->size(); i++){
00257 cout << (laststate)[i] << ", ";
00258 }
00259 cout << " action: " << lastact;
00260 cout << " to state: ";
00261 for (unsigned i = 0; i < currstate.size(); i++){
00262 cout << (currstate)[i] << ", ";
00263 }
00264 cout << " and reward: " << reward << endl;
00265 }
00266
00267
00268 if (ATHREADDEBUG)
00269 cout << "*** Action thread wants list lock ***" << endl << flush;
00270 if (TIMINGDEBUG) cout << "Want list mutex, time: " << (getSeconds()-initTime) << endl;
00271 pthread_mutex_lock(&list_mutex);
00272 if (TIMINGDEBUG) cout << "got list mutex, time: " << (getSeconds()-initTime) << endl;
00273 experience e;
00274 e.s = *last;
00275 e.next = currstate;
00276 e.act = lastact;
00277 e.reward = reward;
00278 e.terminal = term;
00279
00280 expList.push_back(e);
00281
00282 if (ATHREADDEBUG || MTHREADDEBUG)
00283 cout << "added exp to list, size: " << expList.size() << endl << flush;
00284 if (TIMINGDEBUG) cout << "list updated, time: " << (getSeconds()-initTime) << endl;
00285 pthread_cond_signal(&list_cond);
00286 pthread_mutex_unlock(&list_mutex);
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296 if (timingType)
00297 planTime = getSeconds();
00298
00299 if (TIMINGDEBUG) cout << "leaving updateModel, time: " << (getSeconds()-initTime) << endl;
00300
00301
00302 return false;
00303
00304 }
00305
00306 void PO_ParallelETUCT::updateStateActionFromModel(state_t s, int a, state_info* info){
00307
00308 pthread_mutex_lock(&info->statemodel_mutex);
00309 StateActionInfo* newModel = &(info->model[a]);
00310 updateStateActionHistoryFromModel(*s, a, newModel);
00311 pthread_mutex_unlock(&info->statemodel_mutex);
00312
00313 }
00314
00315 void PO_ParallelETUCT::updateStateActionHistoryFromModel(const std::vector<float> modState, int a, StateActionInfo *newModel){
00316
00317
00318
00319 pthread_mutex_lock(&model_mutex);
00320
00321 model->getStateActionInfo(modState, a, newModel);
00322
00323 pthread_mutex_lock(&nactions_mutex);
00324 newModel->frameUpdated = nactions;
00325 pthread_mutex_unlock(&nactions_mutex);
00326
00327 pthread_mutex_unlock(&model_mutex);
00328
00329 if (HISTORY_SIZE > 0){
00330
00331
00332 std::deque<float> newHistory;
00333 int stateSize = modState.size() - HISTORY_FL_SIZE;
00334
00335 if (HISTORYDEBUG) cout << "input history was: ";
00336 for (int i = 0; i < HISTORY_FL_SIZE; i++){
00337 newHistory.push_back(modState[i+stateSize]);
00338 if (HISTORYDEBUG) cout << modState[i+stateSize] << ", ";
00339 }
00340 if (HISTORYDEBUG) cout << endl;
00341
00342
00343 for (int i = 0; i < numactions; i++){
00344 if (i == a)
00345 newHistory.push_back(1.0);
00346 else
00347 newHistory.push_back(0.0);
00348 newHistory.pop_front();
00349 }
00350
00351 if (HISTORYDEBUG){
00352 cout << "act: " << a << ", new history:";
00353 for (unsigned i = 0; i < newHistory.size(); i++){
00354 cout << newHistory[i] << ", ";
00355 }
00356 cout << endl;
00357 }
00358
00359
00360 std::map< std::vector<float>, float> oldProbs = newModel->transitionProbs;
00361 newModel->transitionProbs.clear();
00362
00363 for (std::map<std::vector<float>, float>::iterator outIt
00364 = oldProbs.begin();
00365 outIt != oldProbs.end(); outIt++){
00366
00367 float prob = (*outIt).second;
00368 std::vector<float> next = (*outIt).first;
00369
00370 for (unsigned i = 0; i < newHistory.size(); i++){
00371 next.push_back(newHistory[i]);
00372 }
00373
00374 if (HISTORYDEBUG){
00375 cout << "add history onto prediction of state: ";
00376 for (unsigned i = 0; i < next.size(); i++){
00377 cout << next[i] << ", ";
00378 }
00379 cout << " with prob " << prob << endl;
00380 }
00381
00382 newModel->transitionProbs[next] = prob;
00383 }
00384 }
00385
00386
00387
00388
00389 }
00390
00391 void PO_ParallelETUCT::canonNextStates(StateActionInfo* modelInfo){
00392
00393
00394
00395 for (std::map<std::vector<float>, float>::iterator outIt
00396 = modelInfo->transitionProbs.begin();
00397 outIt != modelInfo->transitionProbs.end(); outIt++){
00398
00399 std::vector<float> nextstate = (*outIt).first;
00400 bool badState = false;
00401
00402
00403 for (unsigned j = 0; j < featmax.size(); j++){
00404 if (nextstate[j] < (featmin[j]-EPSILON)
00405 || nextstate[j] > (featmax[j]+EPSILON)){
00406
00407 badState = true;
00408 break;
00409 }
00410 }
00411
00412 if (!badState){
00413 canonicalize(nextstate);
00414 }
00415 }
00416 }
00417
00418 int PO_ParallelETUCT::getBestAction(const std::vector<float> &state){
00419
00420
00421 pthread_mutex_lock(&nactions_mutex);
00422 nactions++;
00423 pthread_mutex_unlock(&nactions_mutex);
00424
00425
00426 if (TIMINGDEBUG) cout << "getBestAction, time: " << (getSeconds()-initTime) << endl;
00427
00428
00429 pthread_mutex_lock(&history_mutex);
00430 std::vector<float> modState = state;
00431 for (int i = 0; i < HISTORY_FL_SIZE; i++){
00432 modState.push_back(saHistory[i]);
00433 }
00434 pthread_mutex_unlock(&history_mutex);
00435
00436 state_t s = canonicalize(modState);
00437
00438
00439 if (ATHREADDEBUG)
00440 cout << "*** Action thread wants plan state lock ***" << endl << flush;
00441 if (TIMINGDEBUG) cout << "want planStateMut, time: " << (getSeconds()-initTime) << endl;
00442
00443 pthread_mutex_lock(&(plan_state_mutex));
00444 if (TIMINGDEBUG) cout << "got planStateMut, time: " << (getSeconds()-initTime) << endl;
00445
00446 actualPlanState = modState;
00447 discPlanState = s;
00448 setTime = getSeconds();
00449
00450 if (ATHREADDEBUG){
00451 cout << "Set planning state as: ";
00452 for (unsigned i = 0; i < modState.size(); i++){
00453 cout << modState[i] << ", ";
00454 }
00455 cout << endl << flush;
00456 }
00457
00458
00459 pthread_mutex_unlock(&(plan_state_mutex));
00460 if (TIMINGDEBUG) cout << "set planState, time: " << (getSeconds()-initTime) << endl;
00461
00462
00463 pthread_mutex_lock(&statespace_mutex);
00464 state_info* info = &(statedata[s]);
00465 pthread_mutex_unlock(&statespace_mutex);
00466
00467
00468
00469
00470
00471
00472
00473 while (((getSeconds()- initTime) < MAX_TIME)){
00474 if (TIMINGDEBUG)
00475 cout << "waiting for time: " << (getSeconds()-initTime) << endl;
00476
00477 pthread_yield();
00478 }
00479
00480 if (TIMINGDEBUG) cout << "time up: " << (getSeconds()-initTime) << endl;
00481
00482 if (TIMINGDEBUG && (getSeconds()-initTime) > 0.15) cout << "**********" << endl;
00483
00484 pthread_mutex_lock(&info->stateinfo_mutex);
00485
00486
00487 std::vector<float> &Q = info->Q;
00488
00489
00490 if (ATHREADDEBUG) {
00491 if (previnfo != NULL)
00492 cout << " ... now " << previnfo->uctVisits << " times." << endl;
00493 cout << "Getting best action from state ";
00494 for (unsigned i = 0; i < s->size(); i++){
00495 cout << (*s)[i] << ", ";
00496 }
00497 cout << " sampled " << info->uctVisits << " times.";
00498 }
00499
00500
00501 const std::vector<float>::iterator a =
00502 random_max_element(Q.begin(), Q.end());
00503 int act = a - Q.begin();
00504
00505 if (TIMINGDEBUG) cout << "got action: " << (getSeconds()-initTime) << endl;
00506
00507 pthread_mutex_unlock(&info->stateinfo_mutex);
00508
00509
00510 return act;
00511 }
00512
00513
00514
00515
00516
00517
00518 void PO_ParallelETUCT::planOnNewModel(){
00519
00520
00521
00522 if (!modelThreadStarted){
00523 modelThreadStarted = true;
00524 pthread_create(&modelThread, NULL, poParallelModelLearningStart, this);
00525 }
00526
00527 if (!planThreadStarted){
00528 planThreadStarted = true;
00529 pthread_create(&(planThread), NULL, poParallelSearchStart, this);
00530 }
00531
00532 }
00533
00534 void* poParallelModelLearningStart(void* arg){
00535 cout << "Start model learning thread" << endl << flush;
00536 PO_ParallelETUCT* pe = reinterpret_cast<PO_ParallelETUCT*>(arg);
00537 while(true){
00538 pe->parallelModelLearning();
00539
00540
00541
00542
00543
00544
00545 }
00546 return NULL;
00547 }
00548
00549 void PO_ParallelETUCT::parallelModelLearning(){
00550
00551
00552
00553 pthread_mutex_lock(&list_mutex);
00554 while (expList.size() == 0){
00555 pthread_cond_wait(&list_cond,&list_mutex);
00556 }
00557 pthread_mutex_unlock(&list_mutex);
00558
00559
00560 std::vector<experience> updateList;
00561 if (MTHREADDEBUG) cout << " *** Model thread wants list lock ***" << endl << flush;
00562 pthread_mutex_lock(&list_mutex);
00563 updateList = expList;
00564 expList.clear();
00565 if (MTHREADDEBUG) cout << " *** Model thread done with list lock ***" << endl << flush;
00566 pthread_mutex_unlock(&list_mutex);
00567
00568
00569
00570
00571
00572
00573
00574
00575
00576
00577
00578
00579 modelcopy = model->getCopy();
00580
00581
00582
00583 bool modelChanged = modelcopy->updateWithExperiences(updateList);
00584
00585
00586 pthread_mutex_lock(&model_mutex);
00587
00588
00589 delete model;
00590 model = modelcopy;
00591 if (MTHREADDEBUG) cout << " Model updated" << endl << flush;
00592
00593 pthread_mutex_unlock(&model_mutex);
00594
00595
00596
00597
00598 if (modelChanged) resetAndUpdateStateActions();
00599
00600 pthread_yield();
00601
00602
00603 }
00604
00605
00606
00607
00608 void PO_ParallelETUCT::resetAndUpdateStateActions(){
00609
00610 const int MIN_VISITS = 10;
00611
00612 pthread_mutex_lock(&nactions_mutex);
00613 int updateTime = nactions;
00614 pthread_mutex_unlock(&nactions_mutex);
00615
00616
00617
00618 pthread_mutex_lock(&statespace_mutex);
00619
00620 for (std::set<std::vector<float> >::iterator i = statespace.begin();
00621 i != statespace.end(); i++){
00622 pthread_mutex_unlock(&statespace_mutex);
00623
00624 state_t s = canonicalize(*i);
00625
00626 if (MTHREADDEBUG) cout << " *** Model thread wants search lock ***" << endl;
00627
00628 if (MTHREADDEBUG) cout << " *** Model thread got search lock " << endl;
00629
00630 pthread_mutex_lock(&statespace_mutex);
00631 state_info* info = &(statedata[s]);
00632 pthread_mutex_unlock(&statespace_mutex);
00633
00634 pthread_mutex_lock(&info->stateinfo_mutex);
00635
00636 if (info->uctVisits > (MIN_VISITS * numactions))
00637 info->uctVisits = MIN_VISITS * numactions;
00638
00639 for (int j = 0; j < numactions; j++){
00640 if (info->needsUpdate){
00641 updateStateActionFromModel(s, j, info);
00642 }
00643 if (info->uctActions[j] > MIN_VISITS)
00644 info->uctActions[j] = MIN_VISITS;
00645 }
00646 info->needsUpdate = false;
00647 pthread_mutex_unlock(&info->stateinfo_mutex);
00648
00649 pthread_yield();
00650
00651 pthread_mutex_lock(&statespace_mutex);
00652
00653 }
00654 pthread_mutex_unlock(&statespace_mutex);
00655
00656 pthread_mutex_lock(&update_mutex);
00657 lastUpdate = updateTime;
00658 pthread_mutex_unlock(&update_mutex);
00659
00660 }
00661
00662
00663
00664
00666
00668
00669 PO_ParallelETUCT::state_t PO_ParallelETUCT::canonicalize(const std::vector<float> &s) {
00670 if (PLANNERDEBUG) cout << "canonicalize(s = " << s[0] << ", "
00671 << s[1] << ")" << endl;
00672
00673
00674 std::vector<float> s2;
00675 if (statesPerDim[0] > 0){
00676 s2 = discretizeState(s);
00677 } else {
00678 s2 = s;
00679 }
00680
00681 pthread_mutex_lock(&statespace_mutex);
00682
00683
00684 const std::pair<std::set<std::vector<float> >::iterator, bool> result =
00685 statespace.insert(s2);
00686 state_t retval = &*result.first;
00687
00688
00689 if (result.second) {
00690 state_info* info = &(statedata[retval]);
00691 int id = nstates++;
00692 pthread_mutex_unlock(&statespace_mutex);
00693 initStateInfo(retval, info, id);
00694 } else {
00695 pthread_mutex_unlock(&statespace_mutex);
00696 }
00697
00698 return retval;
00699 }
00700
00701
00702
00703 void PO_ParallelETUCT::initStateInfo(state_t s, state_info* info, int id){
00704
00705
00706
00707 pthread_mutex_init(&info->statemodel_mutex, NULL);
00708 pthread_mutex_init(&info->stateinfo_mutex, NULL);
00709
00710 pthread_mutex_lock(&info->stateinfo_mutex);
00711
00712
00713
00714 pthread_mutex_lock(&info->statemodel_mutex);
00715 info->model = new StateActionInfo[numactions];
00716 pthread_mutex_unlock(&info->statemodel_mutex);
00717
00718
00719
00720 info->id = id;
00721 if (PLANNERDEBUG) cout << " id = " << info->id << endl;
00722
00723
00724 info->Q.resize(numactions, 0);
00725 info->uctActions.resize(numactions, 1);
00726 info->uctVisits = 1;
00727 info->visited = 0;
00728
00729 for (int i = 0; i < numactions; i++){
00730 info->Q[i] = rng.uniform(0, 0.01);
00731 }
00732
00733 info->needsUpdate = true;
00734
00735 pthread_mutex_unlock(&info->stateinfo_mutex);
00736
00737
00738
00739 }
00740
00741
00742 void PO_ParallelETUCT::printStates(){
00743
00744 pthread_mutex_lock(&statespace_mutex);
00745 for (std::set< std::vector<float> >::iterator i = statespace.begin();
00746 i != statespace.end(); i++){
00747 pthread_mutex_unlock(&statespace_mutex);
00748
00749 state_t s = canonicalize(*i);
00750
00751 pthread_mutex_lock(&statespace_mutex);
00752 state_info* info = &(statedata[s]);
00753 pthread_mutex_unlock(&statespace_mutex);
00754
00755 cout << "State " << info->id << ": ";
00756 for (unsigned j = 0; j < s->size(); j++){
00757 cout << (*s)[j] << ", ";
00758 }
00759 cout << endl;
00760
00761 pthread_mutex_lock(&info->stateinfo_mutex);
00762
00763 for (int act = 0; act < numactions; act++){
00764 cout << " Q: " << info->Q[act] << endl;
00765
00766 }
00767
00768 pthread_mutex_unlock(&info->stateinfo_mutex);
00769
00770 pthread_mutex_lock(&statespace_mutex);
00771
00772 }
00773 pthread_mutex_unlock(&statespace_mutex);
00774
00775 }
00776
00777
00778 void PO_ParallelETUCT::deleteInfo(state_info* info){
00779
00780 pthread_mutex_lock(&info->statemodel_mutex);
00781 delete [] info->model;
00782 pthread_mutex_unlock(&info->statemodel_mutex);
00783
00784 }
00785
00786
00787
00788 double PO_ParallelETUCT::getSeconds(){
00789 struct timezone tz;
00790 timeval timeT;
00791 gettimeofday(&timeT, &tz);
00792 return timeT.tv_sec + (timeT.tv_usec / 1000000.0);
00793 }
00794
00795
00796 float PO_ParallelETUCT::uctSearch(const std::vector<float> &actS, state_t discS, int depth){
00797 if (UCTDEBUG){
00798 cout << " uctSearch state ";
00799 for (unsigned i = 0; i < actS.size(); i++){
00800 cout << actS[i] << ", ";
00801 }
00802 cout << " at depth " << depth << endl;
00803 }
00804
00805 pthread_mutex_lock(&statespace_mutex);
00806 state_info* info = &(statedata[discS]);
00807 pthread_mutex_unlock(&statespace_mutex);
00808
00809
00810
00811
00812
00813
00814 if (depth > MAX_DEPTH){
00815 pthread_mutex_lock(&info->stateinfo_mutex);
00816
00817
00818 std::vector<float>::iterator maxAct =
00819 std::max_element(info->Q.begin(),
00820 info->Q.end());
00821 float maxval = *maxAct;
00822
00823 if (UCTDEBUG)
00824 cout << "Terminated after depth: " << depth
00825
00826 << " Q: " << maxval
00827 << " visited: " << info->visited << endl;
00828
00829 pthread_mutex_unlock(&info->stateinfo_mutex);
00830
00831 return maxval;
00832 }
00833
00834
00835 int action = selectUCTAction(info);
00836
00837
00838
00839 float reward = 0;
00840 bool term = false;
00841
00842 pthread_mutex_lock(&info->stateinfo_mutex);
00843
00844 float learnRate;
00845
00846
00847
00848 learnRate = 10.0 / (info->uctActions[action] + 10.0);
00849
00850
00851
00852
00853
00854
00855 info->needsUpdate = true;
00856
00857 pthread_mutex_unlock(&info->stateinfo_mutex);
00858
00859 std::vector<float> actualNext = simulateNextState(actS, discS, info, action, &reward, &term);
00860
00861
00862 if (term){
00863
00864 if (UCTDEBUG) cout << " Terminated on exploration condition" << endl;
00865 pthread_mutex_lock(&info->stateinfo_mutex);
00866
00867 info->Q[action] += learnRate * (reward - info->Q[action]);
00868 info->uctVisits++;
00869 info->uctActions[action]++;
00870
00871 if (UCTDEBUG)
00872 cout << " Depth: " << depth << " Selected action " << action
00873 << " r: " << reward
00874 << " StateVisits: " << info->uctVisits
00875 << " ActionVisits: " << info->uctActions[action] << endl;
00876
00877 pthread_mutex_unlock(&info->stateinfo_mutex);
00878
00879 return reward;
00880 }
00881
00882
00883 state_t discNext = canonicalize(actualNext);
00884
00885 if (UCTDEBUG)
00886 cout << " Depth: " << depth << " Selected action " << action
00887 << " r: " << reward << endl;
00888
00889 pthread_mutex_lock(&info->stateinfo_mutex);
00890 info->visited++;
00891 pthread_mutex_unlock(&info->stateinfo_mutex);
00892
00893
00894 float newQ = reward + gamma * uctSearch(actualNext, discNext, depth+1);
00895
00896 pthread_mutex_lock(&info->stateinfo_mutex);
00897
00898 if (info->visited == 1){
00899
00900
00901 info->Q[action] += learnRate * (newQ - info->Q[action]);
00902 info->uctVisits++;
00903 info->uctActions[action]++;
00904
00905 if (UCTDEBUG)
00906 cout << " Depth: " << depth << " newQ: " << newQ
00907 << " StateVisits: " << info->uctVisits
00908 << " ActionVisits: " << info->uctActions[action] << endl;
00909
00910 if (lambda < 1.0){
00911
00912
00913 std::vector<float>::iterator maxAct =
00914 std::max_element(info->Q.begin(),
00915 info->Q.end());
00916 float maxval = *maxAct;
00917
00918 if (UCTDEBUG)
00919 cout << " Replacing newQ: " << newQ;
00920
00921
00922 newQ = (lambda * newQ) + ((1.0-lambda) * maxval);
00923
00924 if (UCTDEBUG)
00925 cout << " with wAvg: " << newQ << endl;
00926 }
00927
00928 }
00929
00930 info->visited--;
00931 pthread_mutex_unlock(&info->stateinfo_mutex);
00932
00933
00934 return newQ;
00935
00936 }
00937
00938
00939 int PO_ParallelETUCT::selectUCTAction(state_info* info){
00940
00941
00942 pthread_mutex_lock(&info->stateinfo_mutex);
00943
00944 std::vector<float> &Q = info->Q;
00945
00946 if (info->uctActions.size() < (unsigned)numactions){
00947 cout << "ERROR: uctActions has size " << info->uctActions.size() << endl << flush;
00948 info->uctActions.resize(numactions);
00949 }
00950
00951
00952 float rewardBound = rrange;
00953 if (rewardBound < 1.0)
00954 rewardBound = 1.0;
00955 rewardBound /= (1.0 - gamma);
00956 if (UCTDEBUG) cout << "Reward bound: " << rewardBound << endl;
00957
00958 std::vector<float> uctQ(numactions, 0.0);
00959
00960 for (int i = 0; i < numactions; i++){
00961
00962
00963 uctQ[i] = Q[i] +
00964 rewardBound * 2.0 * sqrt(log((float)info->uctVisits) /
00965 (float)info->uctActions[i]);
00966
00967 if (UCTDEBUG)
00968 cout << " Action: " << i << " Q: " << Q[i]
00969 << " visits: " << info->uctActions[i]
00970 << " value: " << uctQ[i] << endl;
00971 }
00972
00973
00974 std::vector<float>::iterator maxAct =
00975 max_element(uctQ.begin(), uctQ.end());
00976 float maxval = *maxAct;
00977 int act = maxAct - uctQ.begin();
00978
00979 if (UCTDEBUG)
00980 cout << " Selected " << act << " val: " << maxval << endl;
00981
00982 pthread_mutex_unlock(&info->stateinfo_mutex);
00983
00984 return act;
00985
00986 }
00987
00988 std::vector<float> PO_ParallelETUCT::simulateNextState(const std::vector<float> &actualState, state_t discState, state_info* info, int action, float* reward, bool* term){
00989
00990
00991
00992
00993 pthread_mutex_lock(&info->statemodel_mutex);
00994 StateActionInfo* modelInfo = NULL;
00995 modelInfo = &(info->model[action]);
00996 pthread_mutex_lock(&update_mutex);
00997 bool upToDate = modelInfo->frameUpdated >= lastUpdate;
00998 pthread_mutex_unlock(&update_mutex);
00999
01000 if (!upToDate){
01001 updateStateActionHistoryFromModel(*discState, action, modelInfo);
01002 }
01003
01004 *reward = modelInfo->reward;
01005 *term = (rng.uniform() < modelInfo->termProb);
01006
01007 if (*term){
01008 pthread_mutex_unlock(&info->statemodel_mutex);
01009 return actualState;
01010 }
01011
01012 float randProb = rng.uniform();
01013
01014 float probSum = 0.0;
01015 std::vector<float> nextstate;
01016
01017 if (REALSTATEDEBUG) cout << "randProb: " << randProb << " numNext: " << modelInfo->transitionProbs.size() << endl;
01018
01019 if (modelInfo->transitionProbs.size() == 0)
01020 nextstate = actualState;
01021
01022 for (std::map<std::vector<float>, float>::iterator outIt
01023 = modelInfo->transitionProbs.begin();
01024 outIt != modelInfo->transitionProbs.end(); outIt++){
01025
01026 float prob = (*outIt).second;
01027 probSum += prob;
01028 if (REALSTATEDEBUG) cout << randProb << ", " << probSum << ", " << prob << endl;
01029
01030 if (randProb <= probSum){
01031 nextstate = (*outIt).first;
01032 if (REALSTATEDEBUG) cout << "selected state " << randProb << ", " << probSum << ", " << prob << endl;
01033 break;
01034 }
01035 }
01036
01037 pthread_mutex_unlock(&info->statemodel_mutex);
01038
01039 if (trackActual){
01040
01041
01042
01043 std::vector<float> relChange = subVec(nextstate, *discState);
01044
01045
01046 nextstate = addVec(actualState, relChange);
01047
01048
01049 }
01050
01051
01052 for (unsigned j = 0; j < featmin.size(); j++){
01053 if (nextstate[j] < (featmin[j]-EPSILON)
01054 || nextstate[j] > (featmax[j]+EPSILON)){
01055
01056 if (HISTORY_SIZE == 0) return actualState;
01057
01058
01059 std::vector<float> modState = actualState;
01060 int stateOnlySize = modState.size()-HISTORY_FL_SIZE;
01061 for (int i = stateOnlySize; i < (int)modState.size(); i++){
01062 if (action == (i - stateOnlySize))
01063 modState[i] = 1;
01064 else
01065 modState[i] = 0;
01066 }
01067 return modState;
01068
01069 }
01070 }
01071
01072
01073 return nextstate;
01074
01075 }
01076
01077 std::vector<float> PO_ParallelETUCT::selectRandomState(){
01078
01079 pthread_mutex_lock(&statespace_mutex);
01080 if (statespace.size() == 0){
01081 pthread_mutex_unlock(&statespace_mutex);
01082 return std::vector<float>(featmax.size());
01083 }
01084 pthread_mutex_unlock(&statespace_mutex);
01085
01086
01087 int index = 0;
01088 std::vector<float> state;
01089
01090 pthread_mutex_lock(&statespace_mutex);
01091 if (statespace.size() > 1){
01092 index = rng.uniformDiscrete(0, statespace.size()-1);
01093 }
01094 pthread_mutex_unlock(&statespace_mutex);
01095
01096 int cnt = 0;
01097
01098 if (PTHREADDEBUG) cout << "*** Planning thread wants search lock (randomstate) ***" << endl << flush;
01099
01100 pthread_mutex_lock(&statespace_mutex);
01101 for (std::set<std::vector<float> >::iterator i = statespace.begin();
01102 i != statespace.end(); i++, cnt++){
01103 if (cnt == index){
01104 state = *i;
01105 break;
01106 }
01107 }
01108 pthread_mutex_unlock(&statespace_mutex);
01109
01110 return state;
01111 }
01112
01113
01114 void* poParallelSearchStart(void* arg){
01115 PO_ParallelETUCT* pe = reinterpret_cast<PO_ParallelETUCT*>(arg);
01116
01117 cout << "start parallel uct planning search thread" << endl << flush;
01118
01119 while(true){
01120 pe->parallelSearch();
01121 }
01122
01123 return NULL;
01124 }
01125
01126 void PO_ParallelETUCT::parallelSearch(){
01127
01128 std::vector<float> actS;
01129 state_t discS;
01130
01131
01132 if (PTHREADDEBUG) {
01133 cout << "*** Planning thread wants planning state lock ***" << endl << flush;
01134 }
01135 pthread_mutex_lock(&(plan_state_mutex));
01136
01137
01138 actS = actualPlanState;
01139 discS = discPlanState;
01140
01141
01142 if (discS == NULL){
01143 pthread_mutex_unlock(&(plan_state_mutex));
01144 return;
01145 }
01146
01147 if (PTHREADDEBUG){
01148 pthread_mutex_lock(&statespace_mutex);
01149 cout << " uct search from state s ("
01150 << statedata[discS].uctVisits <<"): ";
01151 pthread_mutex_unlock(&statespace_mutex);
01152
01153 for (unsigned i = 0; i < discS->size(); i++){
01154 cout << (*discS)[i] << ", ";
01155 }
01156 cout << endl << flush;
01157 }
01158
01159
01160 pthread_mutex_unlock(&(plan_state_mutex));
01161
01162 if (PTHREADDEBUG) cout << "*** Planning thread wants search lock ***" << endl;
01163 uctSearch(actS, discS, 0);
01164
01165 pthread_yield();
01166
01167 }
01168
01169
01170
01171 void PO_ParallelETUCT::savePolicy(const char* filename){
01172
01173 ofstream policyFile(filename, ios::out | ios::binary | ios::trunc);
01174
01175
01176 int fsize = featmin.size();
01177 policyFile.write((char*)&fsize, sizeof(int));
01178
01179
01180 policyFile.write((char*)&numactions, sizeof(int));
01181
01182
01183 pthread_mutex_lock(&statespace_mutex);
01184
01185 for (std::set< std::vector<float> >::iterator i = statespace.begin();
01186 i != statespace.end(); i++){
01187 pthread_mutex_unlock(&statespace_mutex);
01188
01189 state_t s = canonicalize(*i);
01190
01191 pthread_mutex_lock(&statespace_mutex);
01192 state_info* info = &(statedata[s]);
01193 pthread_mutex_unlock(&statespace_mutex);
01194
01195
01196 policyFile.write((char*)&((*i)[0]), sizeof(float)*fsize);
01197
01198
01199 pthread_mutex_lock(&info->stateinfo_mutex);
01200 policyFile.write((char*)&(info->Q[0]), sizeof(float)*numactions);
01201 pthread_mutex_unlock(&info->stateinfo_mutex);
01202
01203 pthread_mutex_lock(&statespace_mutex);
01204 }
01205 pthread_mutex_unlock(&statespace_mutex);
01206
01207 policyFile.close();
01208 }
01209
01210
01211
01212 void PO_ParallelETUCT::loadPolicy(const char* filename){
01213
01214 ifstream policyFile(filename, ios::in | ios::binary);
01215
01216
01217 int fsize;
01218 policyFile.read((char*)&fsize, sizeof(int));
01219 cout << "Numfeats loaded: " << fsize << endl << flush;
01220
01221
01222 int nact;
01223 policyFile.read((char*)&nact, sizeof(int));
01224 cout << "nact loaded: " << nact << endl << flush;
01225 cout << " numactions: " << numactions << endl << flush;
01226
01227 if (nact != numactions){
01228 cout << "this policy is not valid loaded nact: " << nact
01229 << " was told: " << numactions << endl << flush;
01230 exit(-1);
01231 }
01232
01233
01234 while(!policyFile.eof()){
01235 std::vector<float> state(fsize, 0.0);
01236
01237
01238 policyFile.read((char*)&(state[0]), sizeof(float)*fsize);
01239
01240
01241
01242
01243
01244 state_t s = canonicalize(state);
01245
01246 pthread_mutex_lock(&statespace_mutex);
01247 state_info* info = &(statedata[s]);
01248 pthread_mutex_unlock(&statespace_mutex);
01249
01250 if (policyFile.eof()) break;
01251
01252
01253 pthread_mutex_lock(&info->stateinfo_mutex);
01254
01255 policyFile.read((char*)&(info->Q[0]), sizeof(float)*numactions);
01256
01257 info->uctVisits = numactions * 100;
01258
01259 for (int j = 0; j < numactions; j++){
01260 info->uctActions[j] = 100;
01261 }
01262
01263 info->needsUpdate = true;
01264
01265 pthread_mutex_unlock(&info->stateinfo_mutex);
01266
01267
01268
01269
01270
01271
01272
01273 }
01274
01275 policyFile.close();
01276 cout << "Policy loaded!!!" << endl << flush;
01277 }
01278
01279 void PO_ParallelETUCT::logValues(ofstream *of, int xmin, int xmax, int ymin, int ymax){
01280 std::vector<float> state(2, 0.0);
01281 for (int i = xmin ; i < xmax; i++){
01282 for (int j = ymin; j < ymax; j++){
01283 state[0] = j;
01284 state[1] = i;
01285 state_t s = canonicalize(state);
01286
01287 pthread_mutex_lock(&statespace_mutex);
01288 state_info* info = &(statedata[s]);
01289 pthread_mutex_unlock(&statespace_mutex);
01290
01291 pthread_mutex_lock(&info->stateinfo_mutex);
01292
01293 std::vector<float> &Q_s = info->Q;
01294 const std::vector<float>::iterator max =
01295 random_max_element(Q_s.begin(), Q_s.end());
01296 *of << (*max) << ",";
01297
01298 pthread_mutex_unlock(&info->stateinfo_mutex);
01299
01300 }
01301 }
01302 }
01303
01304
01305
01306
01307 std::vector<float> PO_ParallelETUCT::discretizeState(const std::vector<float> &s){
01308 std::vector<float> ds(s.size());
01309
01310 for (unsigned i = 0; i < statesPerDim.size(); i++){
01311
01312
01313
01314
01315
01316 float factor = (featmax[i] - featmin[i]) / (float)statesPerDim[i];
01317 int bin = 0;
01318 if (s[i] > 0){
01319 bin = (int)((s[i]+factor/2) / factor);
01320 } else {
01321 bin = (int)((s[i]-factor/2) / factor);
01322 }
01323
01324 ds[i] = factor*bin;
01325
01326
01327 }
01328
01329 for (unsigned i = statesPerDim.size(); i < s.size(); i++){
01330 ds[i] = s[i];
01331 }
01332
01333 return ds;
01334 }
01335
01336
01337 std::vector<float> PO_ParallelETUCT::addVec(const std::vector<float> &a, const std::vector<float> &b){
01338 if (a.size() != b.size())
01339 cout << "ERROR: add vector sizes wrong " << a.size() << ", " << b.size() << endl;
01340
01341 std::vector<float> c(a.size(), 0.0);
01342 for (unsigned i = 0; i < a.size(); i++){
01343 c[i] = a[i] + b[i];
01344 }
01345
01346 return c;
01347 }
01348
01349 std::vector<float> PO_ParallelETUCT::subVec(const std::vector<float> &a, const std::vector<float> &b){
01350 if (a.size() != b.size())
01351 cout << "ERROR: sub vector sizes wrong " << a.size() << ", " << b.size() << endl;
01352
01353 std::vector<float> c(a.size(), 0.0);
01354 for (unsigned i = 0; i < a.size(); i++){
01355 c[i] = a[i] - b[i];
01356 }
01357
01358 return c;
01359 }
01360
01361 void PO_ParallelETUCT::setFirst(){
01362 if (HISTORY_SIZE == 0) return;
01363
01364 if (HISTORYDEBUG) cout << "first action, set sahistory to 0s" << endl;
01365
01366 pthread_mutex_lock(&(history_mutex));
01367
01368 saHistory.resize(saHistory.size(), 0.0);
01369 pthread_mutex_unlock(&(history_mutex));
01370 }
01371
01372 void PO_ParallelETUCT::setSeeding(bool seeding){
01373
01374 if (HISTORYDEBUG) cout << "set seed mode to " << seeding << endl;
01375 seedMode = seeding;
01376
01377 }