00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <stdarg.h>
00037 #include "config/compiler.h"
00038 #include "core/interface/wmp_interface.h"
00039 #include "core/include/global.h"
00040 #include "core/include/frames.h"
00041 #include "include/queue_core.h"
00042 #include "core/include/queues.h"
00043 #include "core/include/wmp_misc.h"
00044 #include "core/include/queue_core.h"
00045
00046
00047
00048
00049
00050
00051
00052 typedef struct {
00053 unsigned short hash;
00054 short part_id;
00055 } memory_t;
00056
00057 static memory_t memory[10];
00058 static int memory_id = 0;
00059 static int hash_exists(unsigned short hash, short part_id) {
00060 int i;
00061 for (i = 0; i < 10; i++) {
00062 if (hash == memory[i].hash && part_id == memory[i].part_id) {
00063 return 1;
00064 }
00065 }
00066 memory[memory_id].hash = hash;
00067 memory[memory_id].part_id = part_id;
00068 memory_id++;
00069 if (memory_id == 10) {
00070 memory_id = 0;
00071 }
00072 return 0;
00073 }
00074
00075
00076 static int extend_size_if_necessary(queue_t *q, int idx){
00077 int total_msg_size, msg_size_till_this_part, size;
00078
00079 total_msg_size = q->longMsg[idx]->num_parts * q->longMsg[idx]->msg_part_size;
00080 msg_size_till_this_part = q->longMsg[idx]->part_id * q->longMsg[idx]->msg_part_size;
00081 size = total_msg_size > msg_size_till_this_part ? total_msg_size : msg_size_till_this_part;
00082
00083 if (size > q->longMsg[idx]->allocated_size){
00084 char * dp;
00085 WMP_ERROR(stderr, "Not enough space in element %d (%d bytes), allocating %d bytes\n",idx,q->longMsg[idx]->allocated_size,total_msg_size);
00086 dp = MALLOC(size);
00087 if (dp != 0){
00088 memcpy(dp, q->longMsg[idx]->data, q->longMsg[idx]->allocated_size);
00089 FREE(q->longMsg[idx]->data);
00090 q->longMsg[idx]->data = dp;
00091 q->longMsg[idx]->allocated_size = size;
00092 }else{
00093 WMP_ERROR(stderr, "Error: Unable to allocate memory (queue_push_part), discarding message");
00094 q->longMsg[idx]->hash = 0;
00095 return 0;
00096 }
00097 }
00098
00099 size = q->longMsg[idx]->num_parts > q->longMsg[idx]->part_id ? q->longMsg[idx]->num_parts : q->longMsg[idx]->part_id;
00100 if (size > q->longMsg[idx]->max_message_parts){
00101 char * dp;
00102 WMP_ERROR(stderr, "max_message_parts is too small %d (%d bytes), allocating %d bytes\n",idx,q->longMsg[idx]->max_message_parts,size);
00103 dp = MALLOC(size);
00104 if (dp != 0){
00105 memcpy(dp, q->longMsg[idx]->received_part, q->longMsg[idx]->max_message_parts);
00106 FREE(q->longMsg[idx]->received_part);
00107 q->longMsg[idx]->received_part = dp;
00108 q->longMsg[idx]->max_message_parts = size;
00109 }else{
00110 WMP_ERROR(stderr, "Error: Unable to allocate memory for extend message_parts (queue_push_part), discarding message");
00111 q->longMsg[idx]->hash = 0;
00112 return 0;
00113 }
00114 }
00115 return 1;
00116 }
00117
00118
00119 int queue_push_part(queue_t * q, longMsg_t * m) {
00120 int i, idx = -1, must_signal = 0, cnt=0;
00121 exclusive_on(q);
00122 if (m->port < 0 || m->port >= q->num_ports){
00123 WMP_ERROR(stderr, "*** WARNING: Discarded message-part due to incorrect port number (port: %d)\n", m->port);
00124 exclusive_off(q);
00125 return 0;
00126 }
00127 if (hash_exists(m->hash, m->part_id)){
00128 static int dropped = 0;
00129 dropped++;
00130 if (dropped % 10 == 0){
00131 WMP_ERROR(stderr, "*** WARNING: Discarded 10 message-parts due existent hash (hash: %d)\n",m->hash);
00132 }
00133 exclusive_off(q);
00134 return 0;
00135 }
00136
00137 for (i = 0; i < q->max_msg_num; i++) {
00138
00139 if (q->longMsg[i]->hash != 0 && !q->longMsg[i]->done) {
00140 long long age = getRawActualTimeus() - q->longMsg[i]->ts;
00141 if (age > ((long long) 5000000)) {
00142 cnt++;
00143 clear(q, i);
00144 }
00145 }
00146 }
00147 if (cnt > 0){
00148 WMP_ERROR(stderr, "Discarding old message-part (count: %d)\n", cnt);
00149 }
00150
00151 for (i = 0; i < q->max_msg_num; i++) {
00152
00153 if (q->longMsg[i]->hash == m->hash) {
00154 idx = i;
00155 break;
00156 }
00157 }
00158
00159 if (idx < 0) {
00160 for (i = 0; i < q->max_msg_num; i++) {
00161
00162 if (q->longMsg[i]->hash == 0) {
00163 idx = i;
00164 break;
00165 }
00166 }
00167 }
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193 if (idx >= 0) {
00194 int done;
00195
00196 if (m->part_id < 0) {
00197 q->longMsg[idx]->num_parts = -m->part_id;
00198 m->part_id = 0;
00199 }
00200
00201 q->longMsg[idx]->received_part[m->part_id] = 1;
00202 q->longMsg[idx]->src = m->src;
00203 q->longMsg[idx]->msg_part_size = m->msg_part_size;
00204 q->longMsg[idx]->size += m->size;
00205 q->longMsg[idx]->received_parts++;
00206 q->longMsg[idx]->hash = m->hash;
00207 q->longMsg[idx]->port = m->port;
00208 q->longMsg[idx]->priority = m->priority;
00209 q->longMsg[idx]->ts = getRawActualTimeus();
00210
00211 if (!extend_size_if_necessary(q,idx)){
00212 exclusive_off(q);
00213 return 0;
00214 }
00215
00216 memcpy(q->longMsg[idx]->data + (m->part_id * q->longMsg[idx]->msg_part_size), m->data, m->size);
00217
00218 done = (q->longMsg[idx]->num_parts > 0);
00219 for (i=0;i<q->longMsg[idx]->num_parts;i++){
00220 done = done & q->longMsg[idx]->received_part[i];
00221 }
00222
00223 if (done) {
00224 WMP_LM_DEBUG(stderr, "DONE hash: %d size:%d port %d \n",
00225 q->longMsg[idx]->hash, q->longMsg[idx]->size, q->longMsg[idx]->port);
00226 q->longMsg[idx]->done = 1;
00227 must_signal = 1;
00228 }
00229 } else {
00230 WMP_ERROR(stderr, "*** (RT-WMP) NO SPACE to receive new message (queued elements:%d)\n", queue_rx_get_count(q,m->port));
00231 exclusive_off(q);
00232 return -1;
00233 }
00234
00235 exclusive_off(q);
00236
00237 if (must_signal){
00238 SIGNAL(*q->sems[m->port]);
00239 }
00240 return 0;
00241 }
00242
00243 int queue_rx_push_loop_data(queue_t * q, unsigned int port, char * p,
00244 unsigned int size, unsigned int dest, signed char priority, unsigned char src) {
00245 int i, idx =-1;
00246 exclusive_on(q);
00247
00248 if (port < 0 || port >= q->num_ports) {
00249 WMP_ERROR(stderr,"Discarding loop data due to incorrect port number (%d) @ loop\n",port);
00250 exclusive_off(q);
00251 return 0;
00252 }
00253
00254 for (i = 0; i < q->max_msg_num; i++) {
00255 if (q->longMsg[i]->hash == 0) {
00256 idx = i;
00257 }
00258 }
00259
00260 if (idx >= 0) {
00261 q->longMsg[idx]->hash = -1;
00262 q->longMsg[idx]->size = size;
00263 q->longMsg[idx]->dest = dest;
00264 q->longMsg[idx]->src = src;
00265 q->longMsg[idx]->port = port;
00266 q->longMsg[idx]->ts = getRawActualTimeus();
00267 q->longMsg[idx]->num_parts = 1;
00268 q->longMsg[idx]->msg_part_size = size;
00269 q->longMsg[idx]->priority = priority;
00270
00271 if (!extend_size_if_necessary(q,idx)){
00272 exclusive_off(q);
00273 return 0;
00274 }
00275
00276 memcpy(q->longMsg[idx]->data, p, size);
00277 q->longMsg[idx]->done = 1;
00278 exclusive_off(q);
00279 SIGNAL(*q->sems[port]);
00280 return 1;
00281 } else {
00282 exclusive_off(q);
00283 return 0;
00284 }
00285 }
00286
00287 static int look_rx(queue_t * q, int port) {
00288 int i, maxPri = -1, maxPriId = -1;
00289 long long age, older_age = 0;
00290 for (i = 0; i < q->max_msg_num; i++) {
00291 if (q->longMsg[i]->done && q->longMsg[i]->port == port) {
00292 age = getRawActualTimeus() - q->longMsg[i]->ts;
00293 if (q->longMsg[i]->priority > maxPri || (q->longMsg[i]->priority == maxPri && age > older_age)) {
00294 maxPri = q->longMsg[i]->priority;
00295 maxPriId = i;
00296 older_age = age;
00297 }
00298 }
00299 }
00300 return maxPriId;
00301 }
00302
00303
00304
00305 int queue_rx_get_mpm_size(queue_t * q, int port) {
00306 int i, maxPri = 0, maxPriId = -1;
00307
00308 if (port >= q->num_ports || port < 0 ){
00309 WMP_ERROR(stderr," *** (RT-WMP) GetDataSize port number out of range, discarding");
00310 return -1;
00311 }
00312
00313 for (i = 0; i < q->max_msg_num; i++) {
00314 if (q->longMsg[i]->done && q->longMsg[i]->port == port) {
00315 if (q->longMsg[i]->priority > maxPri) {
00316 maxPri = q->longMsg[i]->priority;
00317 maxPriId = i;
00318 }
00319 }
00320 }
00321 if (maxPriId != -1) {
00322 return (int) q->longMsg[maxPriId]->size;
00323 } else {
00324 return -1;
00325 }
00326 }
00327 int queue_rx_wait_data(queue_t * q, int port, int delay){
00328 int ret;
00329 if (port >= q->num_ports || port < 0) {
00330 WMP_ERROR(stderr,
00331 " *** (RT-WMP) Pop port number (%d) out of range (%d), discarding\n", port, q->num_ports);
00332 return -1;
00333 }
00334 if (delay == 0) {
00335 ret = WAIT(*q->sems[port]);
00336 } else if (delay > 0){
00337 ret = WAIT_TIMED(*q->sems[port],delay);
00338 }else{
00339 if (queue_rx_get_count(q,port)>0){
00340 ret = 0;
00341 }else{
00342 ret = -1;
00343 }
00344 }
00345 if (ret == 0){
00346 int id;
00347 exclusive_on(q);
00348 id = look_rx(q, port);
00349 exclusive_off(q);
00350 return id;
00351 }else{
00352 return -1;
00353 }
00354 }
00355
00356 int queue_rx_pop_data(queue_t * q, unsigned int delay, unsigned int port, char ** data, unsigned int * size,
00357 unsigned char * src, signed char * priority) {
00358 int id, ret = 0;
00359 if (port >= q->num_ports || port < 0) {
00360 WMP_ERROR(stderr,
00361 " *** (RT-WMP) Pop port number (%d) out of range (%d), discarding\n", port, q->num_ports);
00362 exclusive_on(q);
00363 return -1;
00364 }
00365
00366 if (delay == 0) {
00367 ret = WAIT(*q->sems[port]);
00368 } else if (delay >0){
00369 ret = WAIT_TIMED(*q->sems[port],delay);
00370 }else{
00371 ret = 0;
00372 }
00373
00374 exclusive_on(q);
00375
00376 if (ret == 0) {
00377 WMP_LM_DEBUG(stderr,"Lock pop data RX\n");
00378 id = look_rx(q, port);
00379 if (id >= 0 && id < q->max_msg_num) {
00380 if (data != 0) (*data) = q->longMsg[id]->data;
00381 if (src != 0) (*src) = q->longMsg[id]->src;
00382 if (priority !=0) (*priority) = q->longMsg[id]->priority;
00383 if (size != 0) (*size) = q->longMsg[id]->size;
00384 }
00385 return id;
00386 }else{
00387 return -1;
00388 }
00389 }
00390
00391 void queue_rx_pop_data_done(queue_t * q, int maxPriId) {
00392 if (maxPriId >= 0 && maxPriId < q->max_msg_num){
00393 clear(q,maxPriId);
00394 WMP_LM_DEBUG(stderr,"Unlock pop done RX\n");
00395 }
00396 exclusive_off(q);
00397 }
00398
00399 void queue_rx_init(queue_t * q, int max_msg_size, int max_msg_num, int num_ports) {
00400
00401 int i, allocated = 0;
00402 q->max_msg_num=max_msg_num;
00403 q->num_ports=num_ports;
00404 q->max_msg_size = max_msg_size;
00405 q->longMsg = (longMsg_t **) MALLOC(max_msg_num * sizeof(longMsg_t *));
00406 allocated+=max_msg_num * sizeof(longMsg_t *);
00407 if (q->longMsg == 0) {
00408 WMP_ERROR(stderr,"Unable to allocate Memory (1)\n");
00409 return;
00410 }
00411
00412 for (i = 0; i < max_msg_num; i++) {
00413
00414 q->longMsg[i] = (longMsg_t *) MALLOC(sizeof(longMsg_t));
00415 allocated+=sizeof(longMsg_t);
00416 if (q->longMsg[i] == 0) {
00417 WMP_ERROR(stderr,"Unable to allocate Memory (2)\n");
00418 return;
00419 }
00420 memset(q->longMsg[i], 0, sizeof(longMsg_t));
00421
00422 q->longMsg[i]->data = MALLOC(max_msg_size);
00423 q->longMsg[i]->allocated_size = max_msg_size;
00424
00425 allocated+=max_msg_size;
00426 if (q->longMsg[i]->data == 0) {
00427 WMP_ERROR(stderr,"Unable to allocate Memory (3)\n");
00428 return;
00429 }
00430 memset(q->longMsg[i]->data, 0, max_msg_size);
00431
00432 q->longMsg[i]->max_message_parts = ((max_msg_size/512)+1);
00433 q->longMsg[i]->received_part = (char *) MALLOC(q->longMsg[i]->max_message_parts);
00434 allocated+=q->longMsg[i]->max_message_parts;
00435 if (q->longMsg[i]->received_part == 0) {
00436 WMP_ERROR(stderr,"Unable to allocate Memory (4)\n");
00437 return;
00438 }
00439 memset(q->longMsg[i]->received_part, 0, q->longMsg[i]->max_message_parts);
00440
00441 }
00442
00443 q->sems = (SEM_T **) MALLOC(num_ports * sizeof(SEM_T *));
00444 q->mtxs = (MUTEX **) MALLOC(num_ports * sizeof(MUTEX *));
00445 allocated+=num_ports * (sizeof(SEM_T *) + sizeof(MUTEX *));
00446 if (q->sems == 0 || q->mtxs == 0) {
00447 WMP_ERROR(stderr,"Unable to allocate Memory (5)\n");
00448 return;
00449 }
00450
00451 for (i = 0; i < num_ports; i++) {
00452 q->sems[i] = (SEM_T *) MALLOC(sizeof(SEM_T));
00453 q->mtxs[i] = (MUTEX *) MALLOC(sizeof(MUTEX));
00454 allocated+=sizeof(SEM_T) + sizeof(MUTEX);
00455 if (q->sems[i] == 0 || q->mtxs[i] == 0) {
00456 WMP_ERROR(stderr,"Unable to allocate Memory (6)\n");
00457 return;
00458 }
00459 SEM_INIT(q->sems[i], 0, 0);
00460 MUTEX_INIT(q->mtxs[i]);
00461 }
00462 MUTEX_INIT(&q->mtx);
00463 MUTEX_INIT(&q->uniq_mtx);
00464
00465 WMP_MSG(stderr,"*** Queues loaded, allocated %d kbytes\n", allocated/1024);
00466 }
00467
00468 void queue_rx_free(queue_t * q) {
00469 int i;
00470 for (i = 0; i < q->max_msg_num; i++) {
00471 FREE(q->longMsg[i]->data);
00472 FREE(q->longMsg[i]->received_part);
00473 FREE(q->longMsg[i]);
00474 }
00475
00476 FREE(q->longMsg);
00477 for (i = 0; i < q->num_ports; i++) {
00478 FREE(q->sems[i]);
00479 FREE(q->mtxs[i]);
00480 }
00481 FREE(q->sems);
00482 FREE(q->mtxs);
00483 }
00484
00485 int queue_rx_get_count(queue_t * q, int port) {
00486 if (port >= q->num_ports || port < 0) {
00487 WMP_ERROR(stderr,"Warning, asking queue_rx_get_count on port %d while only %d\n", port, q->num_ports);
00488 return 0;
00489 } else {
00490 return SEM_GET_COUNT(*q->sems[port]);
00491 }
00492 }
00493 int queue_rx_get_room(queue_t * q){
00494 int i, cnt = 0;
00495 exclusive_on(q);
00496 for (i = 0; i < q->max_msg_num; i++) {
00497 if (q->longMsg[i]->hash == 0){
00498 cnt++;
00499 }
00500 }
00501 exclusive_off(q);
00502 return cnt;
00503 }