00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <stdarg.h>
00037 #include "config/compiler.h"
00038 #include "core/interface/wmp_interface.h"
00039 #include "core/include/global.h"
00040 #include "core/include/frames.h"
00041 #include "include/queue_core.h"
00042 #include "core/include/queues.h"
00043 #include "core/include/wmp_misc.h"
00044 #include "core/include/queue_core.h"
00045
00046 #ifdef USE_LONG_MESSAGE_COMPRESS
00047 #include <zlib.h>
00048 static Bytef * zd;
00049 static uLongf zlen;
00050 #endif
00051
00052
00053
00054 static int look_tx(queue_t * q) {
00055 int i, maxPri = -1, maxPriId = -1;
00056 long long age, older_age = 0;
00057 for (i = 0; i < q->max_msg_num; i++) {
00058 if (q->longMsg[i]->done) {
00059 age = getRawActualTimeus() - q->longMsg[i]->ts;
00060 if (q->longMsg[i]->priority > maxPri || (q->longMsg[i]->priority
00061 == maxPri && age > older_age)) {
00062 maxPri = q->longMsg[i]->priority;
00063 maxPriId = i;
00064 older_age = age;
00065 }
00066 }
00067 }
00068 return maxPriId;
00069 }
00070
00071 static int extend_size_if_necessary(queue_t *q, int idx, int size){
00072 if (size > q->longMsg[idx]->allocated_size){
00073 char * dp;
00074 WMP_ERROR(stderr, "Not enough space in element %d (%d bytes), allocating %d bytes\n",idx,q->longMsg[idx]->allocated_size,size);
00075 dp = MALLOC(size);
00076 if (dp != 0){
00077 FREE(q->longMsg[idx]->data);
00078 q->longMsg[idx]->data = dp;
00079 q->longMsg[idx]->allocated_size = size;
00080 }else{
00081 WMP_ERROR(stderr, "Error: Unable to allocate memory (queue_push_part), discarding message");
00082 q->longMsg[idx]->hash = 0;
00083 return 0;
00084 }
00085 }
00086 return 1;
00087 }
00088
00089
00090 int queue_tx_push_data(queue_t * q, unsigned int port, char * p, unsigned int size,
00091 unsigned int dest, signed char priority) {
00092 int i, nparts, must_signal = 0;
00093 int a = wmpGetNodeId();
00094 unsigned short ts = (unsigned short) (getRawActualTimeus() & 0x7F);
00095
00096 if (q->drop_next){
00097 q->drop_next = 0;
00098 return 1;
00099 }
00100 exclusive_on(q);
00101
00102 a = a << 11;
00103 ts = ts << 5;
00104
00105 q->hash_idx++;
00106 if (q->hash_idx > 14) {
00107 q->hash_idx = 1;
00108 }
00109
00110 nparts = size / q->message_part_size;
00111 if (size % q->message_part_size != 0){
00112 nparts++;
00113 }
00114
00115 for (i = 0; i < q->max_msg_num; i++) {
00116 if (q->longMsg[i]->hash == 0) {
00117
00118 if (!extend_size_if_necessary(q, i, size)) {
00119 exclusive_off(q);
00120 return 0;
00121 }
00122
00123 q->longMsg[i]->size = size;
00124 q->longMsg[i]->dest = dest;
00125 q->longMsg[i]->port = port;
00126 q->longMsg[i]->priority = priority;
00127 q->longMsg[i]->hash = a + ts +q->hash_idx;
00128
00129
00130 q->longMsg[i]->ts = getRawActualTimeus();
00131 q->longMsg[i]->msg_part_size = q->message_part_size;
00132 q->longMsg[i]->num_parts = nparts;
00133 q->longMsg[i]->parts_pointer = 0;
00134 q->longMsg[i]->this_part_size = 0;
00135
00136
00137 if (q->force_burst == port) {
00138 q->longMsg[i]->burst_hash = a + 2047;
00139 } else {
00140 q->longMsg[i]->burst_hash = q->longMsg[i]->hash;
00141 }
00142
00143 memcpy(q->longMsg[i]->data, p, size);
00144 q->elem++;
00145 q->longMsg[i]->done = 1;
00146 must_signal = 1;
00147 break;
00148 }
00149 }
00150 exclusive_off(q);
00151 if (must_signal) {
00152 for (i = 0; i < nparts; i++) {
00153 SIGNAL(q->sem);
00154 }
00155 return 1;
00156 }else{
00157 return 0;
00158 }
00159 }
00160 void queue_tx_force_burst(queue_t * q, int port) {
00161 q->force_burst = port;
00162 }
00163
00164 int queue_tx_get_count(queue_t * q) {
00165 return SEM_GET_COUNT(q->sem);
00166 }
00167
00168 int queue_tx_get_port_period(queue_t * q) {
00169 return SEM_GET_COUNT(q->sem);
00170 }
00171
00172 int queue_tx_pop_part(queue_t * q, longMsg_t ** p) {
00173 int id, ret = 0;
00174
00175 ret = WAIT_TIMED(q->sem,5000);
00176
00177 exclusive_on(q);
00178 if (ret == 0) {
00179 WMP_LM_DEBUG(stderr,"Lock pop data (TX)\n");
00180 id = look_tx(q);
00181 if (p!=0 && id >= 0 && id < q->max_msg_num) {
00182
00183
00184 q->longMsg[id]->pointer = q->longMsg[id]->data + (q->longMsg[id]->parts_pointer * q->message_part_size);
00185
00186
00187 if (q->longMsg[id]->num_parts == 1){
00188 q->longMsg[id]->this_part_size = q->longMsg[id]->size;
00189 }else{
00190 if (q->longMsg[id]->parts_pointer == (q->longMsg[id]->num_parts -1)){
00191 q->longMsg[id]->this_part_size = q->longMsg[id]->size - (q->message_part_size * q->longMsg[id]->parts_pointer) ;
00192 }else{
00193 q->longMsg[id]->this_part_size = q->message_part_size;
00194 }
00195 }
00196
00197
00198 if (q->longMsg[id]->parts_pointer == 0){
00199 q->longMsg[id]->part_id = - q->longMsg[id]->num_parts;
00200 }else{
00201 q->longMsg[id]->part_id = q->longMsg[id]->parts_pointer;
00202 }
00203 *p = q->longMsg[id];
00204 q->last_popped_id = id;
00205 }
00206 return id;
00207 } else {
00208 return -1;
00209 }
00210 }
00211
00212 int queue_tx_reschedule(queue_t * q) {
00213 if (q->last_popped_id >= 0 && q->last_popped_id < q->max_msg_num) {
00214 exclusive_on(q);
00215 q->longMsg[q->last_popped_id]->rescheduled = 1;
00216 exclusive_off(q);
00217 SIGNAL(q->sem);
00218 return 1;
00219 }
00220 return 0;
00221 }
00222
00223 int queue_tx_confirm(queue_t * q) {
00224 if (q->last_popped_id >= 0 && q->last_popped_id < q->max_msg_num) {
00225 exclusive_on(q);
00226 q->longMsg[q->last_popped_id]->rescheduled = 0;
00227 q->longMsg[q->last_popped_id]->parts_pointer++;
00228 if (q->longMsg[q->last_popped_id]->parts_pointer
00229 == q->longMsg[q->last_popped_id]->num_parts) {
00230 clear(q, q->last_popped_id);
00231 q->elem--;
00232 }
00233 exclusive_off(q);
00234 return 1;
00235 }
00236 return 0;
00237 }
00238
00239 void queue_tx_get_last_popped_info(queue_t * q, int * age, int *port, int * priority){
00240 if (q->last_popped_id >= 0 && q->last_popped_id < q->max_msg_num){
00241 *age = ((int) (getRawActualTimeus() - q->longMsg[q->last_popped_id]->ts))/1000;
00242 *port = q->longMsg[q->last_popped_id]->port;
00243 *priority = q->longMsg[q->last_popped_id]->priority;
00244 }
00245 }
00246
00247 int queue_tx_inspect_head(queue_t * q, longMsg_t ** p) {
00248 int maxPriId = look_tx(q);
00249 if (maxPriId > 0) {
00250 if (p != 0) {
00251 *p = q->longMsg[maxPriId];
00252 }
00253 return (int) (getRawActualTimeus - q->longMsg[maxPriId]->ts);
00254 } else {
00255 return -1;
00256 }
00257 }
00258
00259 int queue_tx_get_head_dest(queue_t * q){
00260 int maxPriId = look_tx(q);
00261 if (maxPriId>=0){
00262 return q->longMsg[maxPriId]->dest;
00263 }else{
00264 return -1;
00265 }
00266 }
00267 int queue_tx_get_head_age(queue_t * q){
00268 int maxPriId = look_tx(q);
00269 if (maxPriId>=0){
00270 return (int) (getRawActualTimeus() - q->longMsg[maxPriId]->ts);
00271 }else{
00272 return -1;
00273 }
00274 }
00275
00276 void queue_tx_pop_part_done(queue_t * q, int maxPriId) {
00277 exclusive_off(q);
00278 }
00279
00280 int queue_tx_remove_head(queue_t * q){
00281 int id = queue_tx_pop_part(q,0);
00282 queue_tx_pop_part_done(q,id);
00283 return id;
00284 }
00285
00286 void queue_tx_drop_next(queue_t * q){
00287 q->drop_next = 1;
00288 }
00289
00290 int queue_tx_get_head_id(queue_t * q){
00291 int maxPriId = look_tx(q);
00292 return maxPriId;
00293 }
00294 void queue_tx_drop_elem(queue_t * q, int id){
00295 exclusive_on(q);
00296 clear(q,id);
00297 if (queue_tx_get_count(q)>0){
00298 WAIT(q->sem);
00299 }
00300 exclusive_off(q);
00301 }
00302
00303 int wmp_queue_tx_get_head_dest(void);
00304
00305 void queue_tx_init(queue_t * q, int max_msg_size, int max_msg_num, int msg_part_size) {
00306
00307 int i, allocated = 0;
00308 q->max_msg_num = max_msg_num;
00309 q->max_msg_size = max_msg_size;
00310 q->message_part_size = msg_part_size;
00311 q->longMsg = (longMsg_t **) MALLOC(max_msg_num * sizeof(longMsg_t *));
00312 allocated += max_msg_num * sizeof(longMsg_t *);
00313 if (q->longMsg == 0) {
00314 WMP_ERROR(stderr,"Unable to allocate Memory (1)\n");
00315 return;
00316 }
00317
00318 for (i = 0; i < max_msg_num; i++) {
00319 q->longMsg[i] = (longMsg_t *) MALLOC(sizeof(longMsg_t));
00320 allocated += sizeof(longMsg_t);
00321 if (q->longMsg[i] == 0) {
00322 WMP_ERROR(stderr,"Unable to allocate Memory (2)\n");
00323 return;
00324 }
00325 memset(q->longMsg[i], 0, sizeof(longMsg_t));
00326
00327 q->longMsg[i]->data = MALLOC(max_msg_size);
00328 allocated += max_msg_size;
00329 if (q->longMsg[i]->data == 0) {
00330 WMP_ERROR(stderr,"Unable to allocate Memory (3)\n");
00331 return;
00332 }
00333 memset(q->longMsg[i]->data, 0, max_msg_size);
00334 q->longMsg[i]->allocated_size = max_msg_size;
00335 q->force_burst = -1;
00336 }
00337
00338 SEM_INIT(&q->sem, 0, 0);
00339 MUTEX_INIT(&q->uniq_mtx);
00340
00341 WMP_MSG(stderr,"*** Queues loaded, allocated %d kbytes\n", allocated/1024);
00342 }
00343
00344 void queue_tx_free(queue_t * q) {
00345 int i;
00346 for (i = 0; i < q->max_msg_num; i++) {
00347 FREE(q->longMsg[i]->data);
00348 FREE(q->longMsg[i]);
00349 }
00350 FREE(q->longMsg);
00351
00352 #ifdef USE_LONG_MESSAGE_COMPRESS
00353 free(zd);
00354 #endif
00355
00356 }