00001
00002
00003
00004
00005
00006 static char *rcsid="@(#)$Id$";
00007 #if THREADED
00008
00009 #include "eus.h"
00010
00011 sema_t free_thread_sem;
00012 pointer free_threads;
00013 mutex_t free_thread_lock;
00014 mutex_t qthread_lock;
00015
00016
00017 mutex_t alloc_lock;
00018 rwlock_t gc_lock;
00019
00020
00021 mutex_t qsort_lock;
00022
00023
00024
00025 mutex_t mark_lock;
00026 char *mark_locking;
00027 int mark_lock_thread;
00028
00029
00030 pointer get_free_thread()
00031 { register pointer port;
00032 GC_REGION(sema_wait(&free_thread_sem););
00033 mutex_lock(&free_thread_lock);
00034 port=ccar(free_threads);
00035 free_threads=ccdr(free_threads);
00036 mutex_unlock(&free_thread_lock);
00037 return(port);}
00038
00039 extern int next_special_index;
00040
00041 void thread_main(port)
00042 pointer port;
00043 { pointer val, *spsave, argp;
00044 jmp_buf thjmp;
00045 context *ctx;
00046 unsigned int tid, argc;
00047 int i;
00048 pointer myspecs, reqspecs;
00049
00050 tid=thr_self();
00051
00052 if (tid>=MAXTHREAD) thr_exit(0);
00053 ctx=(context *)((eusinteger_t)port->c.thrp.contex & ~2L);
00054 euscontexts[tid]=ctx;
00055
00056 mkcatchframe(ctx, makeint(0),&thjmp);
00057 mutex_lock(&free_thread_lock);
00058 free_threads=cons(ctx,port, free_threads);
00059 mutex_unlock(&free_thread_lock);
00060
00061 thread_loop:
00062 sema_post(&free_thread_sem);
00063 port->c.thrp.idle=T;
00064 GC_REGION(sema_wait((sema_t *)(port->c.thrp.reqsem->c.ivec.iv)););
00065 port->c.thrp.idle=NIL;
00066
00067 myspecs= ctx->specials;
00068 reqspecs= euscontexts[intval(port->c.thrp.requester)]->specials;
00069 for (i=0; i<next_special_index; i++)
00070 myspecs->c.vec.v[i]= reqspecs->c.vec.v[i];
00071 sema_post((sema_t *)port->c.thrp.runsem->c.ivec.iv);
00072
00073
00074 if ((val=(pointer)eussetjmp(thjmp))==0) {
00075 spsave=ctx->vsp;
00076 argp=port->c.thrp.args;
00077 argc=0;
00078 while (argp!=NIL) {
00079 ckpush(ccar(argp)); argp=ccdr(argp); argc++;}
00080
00081
00082 val=(pointer)ufuncall(ctx, port,
00083 port->c.thrp.func,(pointer)spsave,NULL,argc);
00084 }
00085 else if (val==(pointer)1) {
00086 val=makeint(0);
00087 fprintf(stderr, "thread %d reset\n", tid);}
00088 port->c.thrp.result=val;
00089 if (port->c.thrp.wait!=NIL) {
00090 sema_post((sema_t *)port->c.thrp.donesem->c.ivec.iv);
00091 GC_REGION(sema_wait((sema_t *)port->c.thrp.reqsem->c.ivec.iv););
00092
00093 }
00094
00095 mutex_lock(&free_thread_lock);
00096 free_threads=cons(ctx, port, free_threads);
00097 mutex_unlock(&free_thread_lock);
00098 goto thread_loop;
00099
00100 deletecontext(tid,ctx);
00101 }
00102
00103 pointer MAKE_THREAD(ctx, n, argv)
00104 context *ctx;
00105 int n;
00106 pointer argv[];
00107 { int stack_size, c_stack_size;
00108 context *newctx;
00109 pointer newport, thrlist=NIL;
00110 int i,count;
00111 unsigned int tid;
00112
00113 ckarg2(1,3);
00114 count=ckintval(argv[0]);
00115 if (n>=2) stack_size=ckintval(argv[1]);
00116 else stack_size=32*1024;
00117 if (n==3) c_stack_size=ckintval(argv[2]);
00118 else c_stack_size=stack_size*sizeof(pointer);
00119 for (i=0; i<count; i++) {
00120 newctx=(context *)makelispcontext(stack_size);
00121 newport=(pointer)makethreadport(newctx);
00122 ckpush(newport);
00123 GC_REGION(mutex_lock(&qthread_lock););
00124 #ifdef RGC
00125 active_mutator_num++;
00126 #endif
00127 speval(QTHREADS)=cons(ctx, newport, speval(QTHREADS));
00128 mutex_unlock(&qthread_lock);
00129 #if alpha || PTHREAD
00130 if( thr_create(0, c_stack_size, thread_main, newport, 0, &tid ) != 0 ) {
00131 deletecontext(tid, ctx);
00132 error(E_USER,(pointer)"Number of threads reached limit (64)");
00133 }
00134 newport->c.thrp.id=makeint(tid);
00135 #else
00136 thr_create(0, c_stack_size, (void *(*)(void *))thread_main,
00137 newport, THR_SUSPENDED, &tid);
00138 if (tid>=MAXTHREAD) {
00139 deletecontext(tid, ctx);
00140 error(E_USER,(pointer)"Number of threads reached limit (64)");
00141 }
00142 newport->c.thrp.id=makeint(tid);
00143 thr_continue(tid);
00144 #endif
00145 }
00146 thrlist=stacknlist(ctx,count);
00147 return(thrlist);}
00148
00149 pointer AFUNCALL(ctx, n, argv)
00150 context *ctx;
00151 int n;
00152 pointer argv[];
00153 { register pointer port, args;
00154 register int i;
00155 port=get_free_thread();
00156 port->c.thrp.requester=makeint(thr_self());
00157 port->c.thrp.func=argv[0];
00158 args=NIL;
00159 for (i=1; i<n; i++) { args=cons(ctx,argv[n-i],args);}
00160 port->c.thrp.args=args;
00161 port->c.thrp.wait=T;
00162
00163
00164 sema_post((sema_t *)port->c.thrp.reqsem->c.ivec.iv);
00165
00166
00167
00168
00169 GC_REGION(sema_wait((sema_t *)port->c.thrp.runsem->c.ivec.iv););
00170 return(port);}
00171
00172 pointer AFUNCALL_NO_WAIT(ctx, n, argv)
00173 context *ctx;
00174 int n;
00175 pointer argv[];
00176 { register pointer port, args;
00177 register int i;
00178 port=get_free_thread();
00179 port->c.thrp.requester=makeint(thr_self());
00180 port->c.thrp.func=argv[0];
00181 args=NIL;
00182 for (i=1; i<n; i++) { args=cons(ctx,argv[n-i],args);}
00183 port->c.thrp.args=args;
00184 port->c.thrp.wait=NIL;
00185 sema_post((sema_t *)port->c.thrp.reqsem->c.ivec.iv);
00186
00187
00188
00189 GC_REGION(sema_wait((sema_t *)port->c.thrp.runsem->c.ivec.iv););
00190 return(port);}
00191
00192 pointer WAIT_AFUNCALL(ctx,n,argv)
00193 context *ctx;
00194 int n;
00195 pointer argv[];
00196 { register pointer port, result;
00197 ckarg(1);
00198 port=argv[0];
00199 if (port->c.thrp.wait!=NIL &&
00200 ( 1 ||
00201 port->c.thrp.reqsem->c.ivec.iv[0]>0)) {
00202 GC_REGION(sema_wait((sema_t *)port->c.thrp.donesem->c.ivec.iv););
00203 result=port->c.thrp.result;
00204 sema_post((sema_t *)port->c.thrp.reqsem->c.ivec.iv);
00205 return(result);}
00206 else error(E_USER,(pointer)"wait thread for idle thread");}
00207
00208 pointer FREE_THREADS(ctx,n,argv)
00209 context *ctx;
00210 int n;
00211 pointer argv[];
00212 { return(free_threads);}
00213
00214
00215
00216 #if Linux && !Darwin
00217 #define PTHREAD_MUTEX_NORMAL PTHREAD_MUTEX_ADAPTIVE_NP
00218 #define PTHREAD_MUTEX_RECURSIVE PTHREAD_MUTEX_RECURSIVE_NP
00219 #define PTHREAD_MUTEX_ERRORCHECK PTHREAD_MUTEX_ERRORCHECK_NP
00220 #endif
00221
00222 pointer MAKE_MUTEX_LOCK(ctx,n,argv)
00223 context *ctx;
00224 int n;
00225 pointer argv[];
00226 { register pointer m;
00227 m=makevector(C_INTVECTOR, (sizeof(mutex_t)+sizeof(eusinteger_t)-1)/sizeof(eusinteger_t));
00228 #if alpha
00229 pthread_mutex_init((mutex_t *)m->c.ivec.iv,pthread_mutexattr_default);
00230 #elif PTHREAD
00231 {
00232 pthread_mutexattr_t attr;
00233 pthread_mutexattr_init(&attr);
00234 if (n==1 && isint(argv[0])) {
00235 pthread_mutexattr_settype(&attr, intval(argv[0]));
00236 }else{
00237 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
00238 }
00239 pthread_mutex_init((mutex_t *)m->c.ivec.iv, &attr);
00240 }
00241 #else
00242 mutex_init((mutex_t *)m->c.ivec.iv,USYNC_THREAD,0);
00243 #endif
00244 return(m);}
00245
00246 pointer MUTEX_LOCK(ctx,n,argv)
00247 context *ctx;
00248 int n;
00249 register pointer argv[];
00250 { ckarg(1);
00251 if (!isintvector(argv[0])) error(E_NOINTVECTOR);
00252 mutex_lock((mutex_t *)argv[0]->c.ivec.iv);
00253 return(T);}
00254
00255 pointer MUTEX_TRYLOCK(ctx,n,argv)
00256 context *ctx;
00257 int n;
00258 register pointer argv[];
00259 { ckarg(1);
00260 if (!isintvector(argv[0])) error(E_NOINTVECTOR);
00261 #if SunOS4_1
00262 if (mutex_trylock((mutex_t *)argv[0]->c.ivec.iv)==-1) return(NIL);
00263 #else
00264 if (mutex_trylock((mutex_t *)argv[0]->c.ivec.iv)==EBUSY) return(NIL);
00265 #endif
00266 return(T);}
00267
00268 pointer MUTEX_UNLOCK(ctx,n,argv)
00269 context *ctx;
00270 int n;
00271 register pointer argv[];
00272 { ckarg(1);
00273 if (!isintvector(argv[0])) error(E_NOINTVECTOR);
00274 mutex_unlock((mutex_t *)argv[0]->c.ivec.iv);
00275 return(T);}
00276
00277
00278
00279 pointer MAKE_COND(ctx,n,argv)
00280 context *ctx;
00281 int n;
00282 pointer argv[];
00283 { register pointer m;
00284 #if SunOS4_1
00285 ckarg(1);
00286 #endif
00287 m=makevector(C_INTVECTOR, (sizeof(cond_t)+sizeof(eusinteger_t)-1)/sizeof(eusinteger_t));
00288 #if alpha
00289 pthread_cond_init((cond_t*)m->c.ivec.iv, pthread_condattr_default);
00290 #elif PTHREAD
00291 pthread_cond_init((cond_t*)m->c.ivec.iv, NULL);
00292 #else
00293 #if SunOS4_1
00294 cond_init((cond_t *)m->c.ivec.iv, (mutex_t *)argv[0]->c.ivec.iv);
00295 #else
00296 cond_init((cond_t *)m->c.ivec.iv,USYNC_THREAD,0);
00297 #endif
00298 #endif
00299 return(m);}
00300
00301 pointer COND_WAIT(ctx,n,argv)
00302 context *ctx;
00303 int n;
00304 register pointer argv[];
00305 { ckarg(2);
00306 if (!isintvector(argv[0]) || !isintvector(argv[0])) error(E_NOINTVECTOR);
00307 cond_wait((cond_t *)argv[0]->c.ivec.iv, (mutex_t *)argv[1]->c.ivec.iv);
00308 return(T);}
00309
00310 pointer COND_SIGNAL(ctx,n,argv)
00311 context *ctx;
00312 int n;
00313 register pointer argv[];
00314 { ckarg2(1,2);
00315 if (!isintvector(argv[0])) error(E_NOINTVECTOR);
00316 if (n==2 && argv[1]!=NIL) cond_broadcast((cond_t *)argv[0]->c.ivec.iv);
00317 else cond_signal((cond_t *)argv[0]->c.ivec.iv);
00318 return(T);}
00319
00320
00321
00322 pointer MAKE_SEMAPHORE(ctx,n,argv)
00323 context *ctx;
00324 int n;
00325 pointer argv[];
00326 { pointer s;
00327 s=makevector(C_INTVECTOR, (sizeof(sema_t)+sizeof(eusinteger_t)-1)/sizeof(eusinteger_t));
00328 sema_init((sema_t *)s->c.ivec.iv, 0, USYNC_THREAD, 0);
00329 return(s);}
00330
00331 pointer SEMA_POST(ctx,n,argv)
00332 context *ctx;
00333 int n;
00334 register pointer argv[];
00335 { if (!isintvector(argv[0])) error(E_NOINTVECTOR);
00336 sema_post((sema_t *)argv[0]->c.ivec.iv);
00337 return(T);}
00338
00339 pointer SEMA_WAIT(ctx,n,argv)
00340 context *ctx;
00341 int n;
00342 pointer argv[];
00343 { if (!isintvector(argv[0])) error(E_NOINTVECTOR);
00344 GC_REGION(sema_wait((sema_t *)argv[0]->c.ivec.iv););
00345 return(T);}
00346
00347 pointer SEMA_TRYWAIT(ctx,n,argv)
00348 context *ctx;
00349 int n;
00350 pointer argv[];
00351 { if (!isintvector(argv[0])) error(E_NOINTVECTOR);
00352 if (sema_trywait((sema_t *)argv[0]->c.ivec.iv)==0) return(T);
00353 else return(NIL);}
00354
00355
00356 pointer THR_SELF(ctx,n,argv)
00357 register context *ctx;
00358 int n;
00359 pointer argv[];
00360 {
00361 return(makeint(thr_self()));}
00362
00363 pointer THREAD_SELF(ctx,n,argv)
00364 register context *ctx;
00365 int n;
00366 pointer argv[];
00367 {
00368 return(euscontexts[thr_self()]->threadobj);}
00369
00370 pointer THR_SETPRIO(ctx,n,argv)
00371 register context *ctx;
00372 int n;
00373 pointer argv[];
00374 { int stat;
00375 ckarg(2);
00376 stat=thr_setprio(ckintval(argv[0]),ckintval(argv[1]));
00377 if (stat) return(makeint(-errno));
00378 else return(T);}
00379
00380 pointer THR_GETPRIO(ctx,n,argv)
00381 register context *ctx;
00382 int n;
00383 pointer argv[];
00384 { int stat,prio;
00385 ckarg(1);
00386 stat=thr_getprio(ckintval(argv[0]), &prio);
00387 if (stat) return(makeint(-errno));
00388 else return(makeint(prio));}
00389
00390 pointer THR_SETCONCURRENCY(ctx,n,argv)
00391 register context *ctx;
00392 int n;
00393 pointer argv[];
00394 { int stat;
00395 ckarg(1);
00396 #if SunOS4_1 || alpha || PTHREAD
00397 fprintf(stderr, "thr_setconcurrency is not supprted!\n");
00398 stat = 0;
00399 #else
00400 stat=thr_setconcurrency(ckintval(argv[0]));
00401 #endif
00402 if (stat) return(makeint(stat));
00403 else return(T);}
00404
00405 pointer THR_GETCONCURRENCY(ctx,n,argv)
00406 register context *ctx;
00407 int n;
00408 pointer argv[];
00409 { int concurrency;
00410 ckarg(0);
00411 #if SunOS4_1 || alpha || PTHREAD
00412 fprintf(stderr, "thr_getconcurrency is not supprted!\n");
00413 concurrency = 0;
00414 #else
00415 concurrency=thr_getconcurrency();
00416 #endif
00417 return(makeint(concurrency));}
00418
00419 void newthread(ta)
00420 struct thread_arg *ta;
00421 { int tid;
00422 pointer func,argv[1],result,val;
00423 jmp_buf thjmp;
00424
00425 tid=thr_self();
00426 euscontexts[tid]=ta->newctx;
00427 fprintf(stderr,"new thread_id=%d\n",tid);
00428 mkcatchframe(ta->newctx, makeint(0),&thjmp);
00429 if ((val=(pointer)eussetjmp(thjmp))==0) {
00430 argv[0]=ta->arg;
00431 result=ufuncall(ta->newctx, ta->form, ta->func,(pointer)argv,NULL,1);}
00432 else if (val==(pointer)1) val=makeint(0);
00433 fprintf(stderr, "thread %d terminated\n", tid);
00434 deletecontext(tid,ta->newctx);
00435 cfree(ta);
00436 }
00437
00438 pointer THR_CREATE(ctx,n,argv)
00439 register context *ctx;
00440 int n;
00441 pointer argv[];
00442 { int stack_size,stat;
00443 context *newctx;
00444 unsigned int thread_id;
00445 pointer result;
00446 struct thread_arg *ta;
00447 pointer func=argv[0], arg=argv[1];
00448
00449 ckarg2(2,3);
00450 if (n==3) stack_size=ckintval(argv[2]);
00451 else stack_size=1024*64;
00452
00453 newctx=(context *)makelispcontext(stack_size);
00454 fprintf(stderr,"creater newcontext=%p\n", newctx);
00455 ta=(struct thread_arg *)malloc(sizeof(struct thread_arg));
00456 ta->form=ctx->callfp->form;
00457 ta->newctx=newctx;
00458 ta->func=func;
00459 ta->arg=arg;
00460 stat=thr_create(0, stack_size, (void (*)(void *))newthread,
00461 ta,0,&thread_id);
00462 if (stat) result=makeint(-errno);
00463 else
00464 result=makeint(thread_id);
00465 return(result);
00466 }
00467
00468 pointer THR_KILL(ctx,n,argv)
00469 register context *ctx;
00470 int n;
00471 pointer argv[];
00472 { int tid;
00473 int sig;
00474 ckarg(2);
00475 tid=ckintval(argv[0]);
00476 sig=ckintval(argv[1]);
00477 if (euscontexts[tid]) { thr_kill(tid,sig); return(T);}
00478 else return(NIL);}
00479
00480 pointer THR_SUSPEND(ctx,n,argv)
00481 register context *ctx;
00482 int n;
00483 pointer argv[];
00484 { int tid;
00485 ckarg(1);
00486 #if alpha
00487 fprintf(stderr,"thr_suspend is not implemented.\n" );
00488 return(NIL);
00489 #else
00490 tid=ckintval(argv[0]);
00491 if (euscontexts[tid]) {
00492 if (thr_suspend(tid)==0) return(T);
00493 else return(makeint(-errno));}
00494 else return(NIL);
00495 #endif
00496 }
00497
00498 pointer THR_CONTINUE(ctx,n,argv)
00499 register context *ctx;
00500 int n;
00501 pointer argv[];
00502 { int tid;
00503 ckarg(1);
00504 #if alpha
00505 fprintf(stderr,"thr_continue is not implemented.\n");
00506 return(NIL);
00507 #else
00508 tid=ckintval(argv[0]);
00509 if (euscontexts[tid]) {
00510 if (thr_continue(tid)==0) return(T);
00511 else return(makeint(-errno));}
00512 else return(NIL);
00513 #endif
00514 }
00515
00516 #if Solaris2
00517 pointer THR_SIGSETMASK(ctx,n,argv)
00518 register context *ctx;
00519 int n;
00520 pointer argv[];
00521 { int how, stat;
00522 eusinteger_t *oset;
00523
00524 ckarg2(2,3);
00525 how=ckintval(argv[0]);
00526 if (n==3) oset=argv[2]->c.ivec.iv;
00527 else oset=NULL;
00528 stat=thr_sigsetmask(how, argv[1]->c.ivec.iv, oset);
00529 if (stat==0) {
00530 if (n==3) return(argv[2]);
00531 else return(T);}
00532 else return(makeint(-errno));
00533 }
00534 #endif
00535
00536 int mthread(ctx,mod)
00537 register context *ctx;
00538 pointer mod;
00539 {
00540 free_threads=NIL;
00541
00542 defunpkg(ctx,"THR-SELF",mod,THR_SELF,unixpkg);
00543 defunpkg(ctx,"THR-GETPRIO",mod,THR_GETPRIO,unixpkg);
00544 defunpkg(ctx,"THR-SETPRIO",mod,THR_SETPRIO,unixpkg);
00545 defunpkg(ctx,"THR-GETCONCURRENCY",mod,THR_GETCONCURRENCY,unixpkg);
00546 defunpkg(ctx,"THR-SETCONCURRENCY",mod,THR_SETCONCURRENCY,unixpkg);
00547 defunpkg(ctx,"THR-CREATE",mod,THR_CREATE,unixpkg);
00548 defunpkg(ctx,"THR-KILL",mod,THR_KILL,unixpkg);
00549 defunpkg(ctx,"THR-SUSPEND",mod,THR_SUSPEND,unixpkg);
00550 defunpkg(ctx,"THR-CONTINUE",mod,THR_CONTINUE,unixpkg);
00551 #if Solaris2
00552 defunpkg(ctx,"THR-SIGSETMASK",mod,THR_SIGSETMASK,unixpkg);
00553 #endif
00554
00555 defunpkg(ctx,"THREAD-SELF",mod,THREAD_SELF,syspkg);
00556 defunpkg(ctx,"MAKE-THREAD",mod,MAKE_THREAD,syspkg);
00557 defunpkg(ctx,"THREAD",mod,AFUNCALL, syspkg);
00558 defunpkg(ctx,"THREAD-NO-WAIT",mod,AFUNCALL_NO_WAIT, syspkg);
00559 defunpkg(ctx,"WAIT-THREAD",mod,WAIT_AFUNCALL,syspkg);
00560 defunpkg(ctx,"FREE-THREADS",mod,FREE_THREADS,syspkg);
00561
00562 defunpkg(ctx,"MAKE-MUTEX-LOCK",mod,MAKE_MUTEX_LOCK,syspkg);
00563 defunpkg(ctx,"MUTEX-LOCK",mod, MUTEX_LOCK,syspkg);
00564 defunpkg(ctx,"MUTEX-UNLOCK",mod, MUTEX_UNLOCK,syspkg);
00565 defunpkg(ctx,"MUTEX-TRYLOCK",mod, MUTEX_TRYLOCK,syspkg);
00566 #if PTHREAD
00567 defvar(ctx,"*PTHREAD-MUTEX-FAST*",makeint(PTHREAD_MUTEX_NORMAL),syspkg);
00568 defvar(ctx,"*PTHREAD-MUTEX-RECURSIVE*",makeint(PTHREAD_MUTEX_RECURSIVE),syspkg);
00569 defvar(ctx,"*PTHREAD-MUTEX-ERRORCHECK*",makeint(PTHREAD_MUTEX_ERRORCHECK),syspkg);
00570 #endif
00571
00572 defunpkg(ctx,"MAKE-COND",mod,MAKE_COND,syspkg);
00573 defunpkg(ctx,"COND-WAIT",mod, COND_WAIT,syspkg);
00574 defunpkg(ctx,"COND-SIGNAL",mod, COND_SIGNAL,syspkg);
00575
00576 defunpkg(ctx,"MAKE-SEMAPHORE",mod,MAKE_SEMAPHORE,syspkg);
00577 defunpkg(ctx,"SEMA-POST",mod, SEMA_POST,syspkg);
00578 defunpkg(ctx,"SEMA-WAIT",mod, SEMA_WAIT,syspkg);
00579 defunpkg(ctx,"SEMA-TRYWAIT",mod, SEMA_TRYWAIT,syspkg);
00580 }
00581
00582 #endif