queue_rx.c
Go to the documentation of this file.
00001 /*------------------------------------------------------------------------
00002  *---------------------           RT-WMP              --------------------
00003  *------------------------------------------------------------------------
00004  *                                                         V7.0B  11/05/10
00005  *
00006  *
00007  *  File: ./src/plugins/long_messages/long_messages.c
00008  *  Authors: Danilo Tardioli
00009  *  ----------------------------------------------------------------------
00010  *  Copyright (C) 2000-2010, Universidad de Zaragoza, SPAIN
00011  *
00012  *  Contact Addresses: Danilo Tardioli                   dantard@unizar.es
00013  *
00014  *  RT-WMP is free software; you can  redistribute it and/or  modify it
00015  *  under the terms of the GNU General Public License  as published by the
00016  *  Free Software Foundation;  either  version 2, or (at  your option) any
00017  *  later version.
00018  *
00019  *  RT-WMP  is distributed  in the  hope  that  it will be   useful, but
00020  *  WITHOUT  ANY  WARRANTY;     without  even the   implied   warranty  of
00021  *  MERCHANTABILITY  or  FITNESS FOR A  PARTICULAR PURPOSE.    See the GNU
00022  *  General Public License for more details.
00023  *
00024  *  You should have received  a  copy of  the  GNU General Public  License
00025  *  distributed with RT-WMP;  see file COPYING.   If not,  write to the
00026  *  Free Software  Foundation,  59 Temple Place  -  Suite 330,  Boston, MA
00027  *  02111-1307, USA.
00028  *
00029  *  As a  special exception, if you  link this  unit  with other  files to
00030  *  produce an   executable,   this unit  does  not  by  itself cause  the
00031  *  resulting executable to be covered by the  GNU General Public License.
00032  *  This exception does  not however invalidate  any other reasons why the
00033  *  executable file might be covered by the GNU Public License.
00034  *
00035  *----------------------------------------------------------------------*/
00036 #include <stdarg.h>
00037 #include "config/compiler.h"
00038 #include "core/interface/wmp_interface.h"
00039 #include "core/include/global.h"
00040 #include "core/include/frames.h"
00041 #include "include/queue_core.h"
00042 #include "core/include/queues.h"
00043 #include "core/include/wmp_misc.h"
00044 #include "core/include/queue_core.h"
00045 
00046 /* Extends queue message capacity for element idx :: warning: not real-time behavior
00047  * It is intended to allow the reception of big messages without bothering the user
00048  * with defining the maximum size. The same happens with the vector received_part that
00049  * is used to allow out-of-order arrival of the messages
00050  * */
00051 
00052 typedef struct {
00053         unsigned short hash;
00054         short part_id;
00055 } memory_t;
00056 
00057 static memory_t memory[10];
00058 static int memory_id = 0;
00059 static int hash_exists(unsigned short hash, short part_id) {
00060         int i;
00061         for (i = 0; i < 10; i++) {
00062                 if (hash == memory[i].hash && part_id == memory[i].part_id) {
00063                         return 1;
00064                 }
00065         }
00066         memory[memory_id].hash = hash;
00067         memory[memory_id].part_id = part_id;
00068         memory_id++;
00069         if (memory_id == 10) {
00070                 memory_id = 0;
00071         }
00072         return 0;
00073 }
00074 
00075 
00076 static int extend_size_if_necessary(queue_t *q, int idx){
00077         int total_msg_size, msg_size_till_this_part, size;
00078 
00079         total_msg_size = q->longMsg[idx]->num_parts * q->longMsg[idx]->msg_part_size;
00080         msg_size_till_this_part = q->longMsg[idx]->part_id * q->longMsg[idx]->msg_part_size;
00081         size = total_msg_size > msg_size_till_this_part ? total_msg_size : msg_size_till_this_part;
00082 
00083         if (size > q->longMsg[idx]->allocated_size){
00084                 char * dp;
00085                 WMP_ERROR(stderr, "Not enough space in element %d (%d bytes), allocating %d bytes\n",idx,q->longMsg[idx]->allocated_size,total_msg_size);
00086                 dp = MALLOC(size);
00087                 if (dp != 0){
00088                         memcpy(dp, q->longMsg[idx]->data, q->longMsg[idx]->allocated_size);
00089                         FREE(q->longMsg[idx]->data);
00090                         q->longMsg[idx]->data = dp;
00091                         q->longMsg[idx]->allocated_size = size;
00092                 }else{
00093                         WMP_ERROR(stderr, "Error: Unable to allocate memory (queue_push_part), discarding message");
00094                         q->longMsg[idx]->hash = 0;
00095                         return 0;
00096                 }
00097         }
00098 
00099         size = q->longMsg[idx]->num_parts > q->longMsg[idx]->part_id ? q->longMsg[idx]->num_parts : q->longMsg[idx]->part_id;
00100         if (size > q->longMsg[idx]->max_message_parts){
00101                 char * dp;
00102                 WMP_ERROR(stderr, "max_message_parts is too small %d (%d bytes), allocating %d bytes\n",idx,q->longMsg[idx]->max_message_parts,size);
00103                 dp = MALLOC(size);
00104                 if (dp != 0){
00105                         memcpy(dp, q->longMsg[idx]->received_part, q->longMsg[idx]->max_message_parts);
00106                         FREE(q->longMsg[idx]->received_part);
00107                         q->longMsg[idx]->received_part = dp;
00108                         q->longMsg[idx]->max_message_parts = size;
00109                 }else{
00110                         WMP_ERROR(stderr, "Error: Unable to allocate memory for extend message_parts (queue_push_part), discarding message");
00111                         q->longMsg[idx]->hash = 0;
00112                         return 0;
00113                 }
00114         }
00115         return 1;
00116 }
00117 
00118 
00119 int queue_push_part(queue_t * q,  longMsg_t * m) {
00120         int i, idx = -1,  must_signal = 0, cnt=0;
00121         exclusive_on(q);
00122         if (m->port < 0 || m->port >= q->num_ports){
00123                 WMP_ERROR(stderr, "*** WARNING: Discarded message-part due to incorrect port number (port: %d)\n", m->port);
00124                 exclusive_off(q);
00125                 return 0;
00126         }
00127         if (hash_exists(m->hash, m->part_id)){
00128                 static int dropped = 0;
00129                 dropped++;
00130                 if (dropped % 10 == 0){
00131                         WMP_ERROR(stderr, "*** WARNING: Discarded 10 message-parts due existent hash (hash: %d)\n",m->hash);
00132                 }
00133                 exclusive_off(q);
00134                 return 0;
00135         }
00136 
00137         for (i = 0; i < q->max_msg_num; i++) {
00138                 /* Discards unfinished old messages and take the place */
00139                 if (q->longMsg[i]->hash != 0 && !q->longMsg[i]->done) {
00140                         long long age = getRawActualTimeus() - q->longMsg[i]->ts;
00141                         if (age > ((long long) 5000000)) {
00142                                 cnt++;
00143                                 clear(q, i);
00144                         }
00145                 }
00146         }
00147         if (cnt > 0){
00148                 WMP_ERROR(stderr, "Discarding old message-part (count: %d)\n", cnt);
00149         }
00150 
00151         for (i = 0; i < q->max_msg_num; i++) {
00152                 /* Try to find an already existing part */
00153                 if (q->longMsg[i]->hash == m->hash) {
00154                         idx = i;
00155                         break;
00156                 }
00157         }
00158 
00159         if (idx < 0) {
00160                 for (i = 0; i < q->max_msg_num; i++) {
00161                         /* Try to find a free place */
00162                         if (q->longMsg[i]->hash == 0) {
00163                                 idx = i;
00164                                 break;
00165                         }
00166                 }
00167         }
00168 
00169 //      if (idx < 0) {
00170 //              for (i = 0; i < q->max_msg_num; i++) {
00171 //                      /* Discards unfinished old messages and take the place */
00172 //                      if (q->longMsg[i]->hash != 0 && !q->longMsg[i]->done) {
00173 //                              long long age = getRawActualTimeus() - q->longMsg[i]->ts;
00174 //                              if (age > ((long long) 5000000)) {
00175 //                                      idx = i;
00176 //                                      WMP_ERROR(stderr,"Warning: Discard uncompleted message pos:%d "
00177 //                                                      "hash:%d "
00178 //                                                      "num_parts:%d "
00179 //                                                      "received_parts:%d "
00180 //                                                      "size:%d \n", idx, q->longMsg[i]->hash, q->longMsg[i]->num_parts,q->longMsg[i]->received_parts,  q->longMsg[i]->size);
00181 //                                      clear(q, i);
00182 //
00183 //                                      //break;
00184 //                              }
00185 //                      }
00186 //              }
00187 //      }
00188 
00189 //  Warning: to discard you must take into account the situation of the semaphore:
00190 //  If you discard a message you have to take into account if it was complete
00191 //  or not and if the message had already signaled the semaphore
00192 
00193         if (idx >= 0) {
00194                 int done;
00195 
00196                 if (m->part_id < 0) {
00197                         q->longMsg[idx]->num_parts = -m->part_id;
00198                         m->part_id = 0;
00199                 }
00200 
00201                 q->longMsg[idx]->received_part[m->part_id] = 1;
00202                 q->longMsg[idx]->src = m->src;
00203                 q->longMsg[idx]->msg_part_size = m->msg_part_size;
00204                 q->longMsg[idx]->size += m->size;
00205                 q->longMsg[idx]->received_parts++;
00206                 q->longMsg[idx]->hash = m->hash;
00207                 q->longMsg[idx]->port = m->port;
00208                 q->longMsg[idx]->priority = m->priority;
00209                 q->longMsg[idx]->ts = getRawActualTimeus();
00210 
00211                 if (!extend_size_if_necessary(q,idx)){
00212                         exclusive_off(q);
00213                         return 0;
00214                 }
00215 
00216                 memcpy(q->longMsg[idx]->data + (m->part_id * q->longMsg[idx]->msg_part_size), m->data, m->size);
00217 
00218                 done = (q->longMsg[idx]->num_parts > 0);
00219                 for (i=0;i<q->longMsg[idx]->num_parts;i++){
00220                         done = done & q->longMsg[idx]->received_part[i];
00221                 }
00222 
00223                 if (done) {
00224                         WMP_LM_DEBUG(stderr, "DONE hash: %d size:%d port %d \n",
00225                                         q->longMsg[idx]->hash, q->longMsg[idx]->size, q->longMsg[idx]->port);
00226                         q->longMsg[idx]->done = 1;
00227                         must_signal = 1;
00228                 }
00229         } else {
00230                 WMP_ERROR(stderr, "*** (RT-WMP) NO SPACE to receive new message (queued elements:%d)\n", queue_rx_get_count(q,m->port));
00231                 exclusive_off(q);
00232                 return -1;
00233         }
00234 
00235         exclusive_off(q);
00236 
00237         if (must_signal){
00238                 SIGNAL(*q->sems[m->port]);
00239         }
00240         return 0;
00241 }
00242 
00243 int queue_rx_push_loop_data(queue_t * q, unsigned int port, char * p,
00244                 unsigned int size, unsigned int dest, signed char priority, unsigned char src) {
00245         int i, idx =-1;
00246         exclusive_on(q);
00247 
00248         if (port < 0 || port >= q->num_ports) {
00249                 WMP_ERROR(stderr,"Discarding loop data due to incorrect port number (%d) @ loop\n",port);
00250                 exclusive_off(q);
00251                 return 0;
00252         }
00253 
00254         for (i = 0; i < q->max_msg_num; i++) {
00255                 if (q->longMsg[i]->hash == 0) {
00256                         idx = i;
00257                 }
00258         }
00259 
00260         if (idx >= 0) {
00261                 q->longMsg[idx]->hash = -1;
00262                 q->longMsg[idx]->size = size;
00263                 q->longMsg[idx]->dest = dest;
00264                 q->longMsg[idx]->src = src;
00265                 q->longMsg[idx]->port = port;
00266                 q->longMsg[idx]->ts = getRawActualTimeus();
00267                 q->longMsg[idx]->num_parts = 1;
00268                 q->longMsg[idx]->msg_part_size = size;
00269                 q->longMsg[idx]->priority = priority;
00270 
00271                 if (!extend_size_if_necessary(q,idx)){
00272                         exclusive_off(q);
00273                         return 0;
00274                 }
00275 
00276                 memcpy(q->longMsg[idx]->data, p, size);
00277                 q->longMsg[idx]->done = 1;
00278                 exclusive_off(q);
00279                 SIGNAL(*q->sems[port]);
00280                 return 1;
00281         } else {
00282                 exclusive_off(q);
00283                 return 0;
00284         }
00285 }
00286 
00287 static int look_rx(queue_t * q, int port) {
00288         int i, maxPri = -1, maxPriId = -1;
00289         long long age, older_age = 0;
00290         for (i = 0; i < q->max_msg_num; i++) {
00291                 if (q->longMsg[i]->done && q->longMsg[i]->port == port) {
00292                         age = getRawActualTimeus() - q->longMsg[i]->ts;
00293                         if (q->longMsg[i]->priority > maxPri || (q->longMsg[i]->priority == maxPri && age > older_age)) {
00294                                 maxPri = q->longMsg[i]->priority;
00295                                 maxPriId = i;
00296                                 older_age = age;
00297                         }
00298                 }
00299         }
00300         return maxPriId;
00301 }
00302 
00303 /* Interface */
00304 
00305 int queue_rx_get_mpm_size(queue_t * q, int port) {
00306         int i, maxPri = 0, maxPriId = -1;
00307 
00308         if (port >= q->num_ports || port < 0 ){
00309                 WMP_ERROR(stderr," *** (RT-WMP) GetDataSize port number out of range, discarding");
00310                 return -1;
00311         }
00312 
00313         for (i = 0; i < q->max_msg_num; i++) {
00314                 if (q->longMsg[i]->done && q->longMsg[i]->port == port) {
00315                         if (q->longMsg[i]->priority > maxPri) {
00316                                 maxPri = q->longMsg[i]->priority;
00317                                 maxPriId = i;
00318                         }
00319                 }
00320         }
00321         if (maxPriId != -1) {
00322                 return (int) q->longMsg[maxPriId]->size;
00323         } else {
00324                 return -1;
00325         }
00326 }
00327 int queue_rx_wait_data(queue_t * q, int port, int delay){
00328         int ret;
00329         if (port >= q->num_ports || port < 0) {
00330                 WMP_ERROR(stderr,
00331                                 " *** (RT-WMP) Pop port number (%d) out of range (%d), discarding\n", port, q->num_ports);
00332                 return -1;
00333         }
00334         if (delay == 0) {
00335                 ret = WAIT(*q->sems[port]);
00336         } else if (delay > 0){
00337                 ret = WAIT_TIMED(*q->sems[port],delay);
00338         }else{
00339                 if (queue_rx_get_count(q,port)>0){
00340                         ret = 0;
00341                 }else{
00342                         ret = -1;
00343                 }
00344         }
00345         if (ret == 0){
00346                 int id;
00347                 exclusive_on(q);
00348                 id = look_rx(q, port);
00349                 exclusive_off(q);
00350                 return id;
00351         }else{
00352                 return -1;
00353         }
00354 }
00355 
00356 int queue_rx_pop_data(queue_t * q, unsigned int delay, unsigned int port, char ** data, unsigned int * size,
00357                 unsigned char * src, signed char * priority) {
00358         int id, ret = 0;
00359         if (port >= q->num_ports || port < 0) {
00360                 WMP_ERROR(stderr,
00361                                 " *** (RT-WMP) Pop port number (%d) out of range (%d), discarding\n", port, q->num_ports);
00362                 exclusive_on(q);
00363                 return -1;
00364         }
00365 
00366         if (delay == 0) {
00367                 ret = WAIT(*q->sems[port]);
00368         } else if (delay >0){
00369                 ret = WAIT_TIMED(*q->sems[port],delay);
00370         }else{
00371                 ret = 0;
00372         }
00373 
00374         exclusive_on(q);
00375 
00376         if (ret == 0) {
00377                 WMP_LM_DEBUG(stderr,"Lock pop data RX\n");
00378                 id = look_rx(q, port);
00379                 if (id >= 0 && id < q->max_msg_num) {
00380                         if (data != 0)     (*data)     = q->longMsg[id]->data;
00381                         if (src != 0)     (*src)      = q->longMsg[id]->src;
00382                         if (priority !=0) (*priority) = q->longMsg[id]->priority;
00383                         if (size != 0)    (*size)     = q->longMsg[id]->size;
00384                 }
00385                 return id;
00386         }else{
00387                 return -1;
00388         }
00389 }
00390 
00391 void queue_rx_pop_data_done(queue_t * q, int maxPriId) {
00392         if (maxPriId >= 0 && maxPriId < q->max_msg_num){
00393                 clear(q,maxPriId);
00394                 WMP_LM_DEBUG(stderr,"Unlock pop done RX\n");
00395         }
00396         exclusive_off(q);
00397 }
00398 
00399 void queue_rx_init(queue_t * q, int max_msg_size, int max_msg_num, int num_ports) {
00400 
00401         int i, allocated = 0;
00402         q->max_msg_num=max_msg_num;
00403         q->num_ports=num_ports;
00404         q->max_msg_size = max_msg_size;
00405         q->longMsg = (longMsg_t **) MALLOC(max_msg_num * sizeof(longMsg_t *));
00406         allocated+=max_msg_num * sizeof(longMsg_t *);
00407         if (q->longMsg == 0) {
00408                 WMP_ERROR(stderr,"Unable to allocate Memory (1)\n");
00409                 return;
00410         }
00411 
00412         for (i = 0; i < max_msg_num; i++) {
00413 
00414                 q->longMsg[i] = (longMsg_t *) MALLOC(sizeof(longMsg_t));
00415                 allocated+=sizeof(longMsg_t);
00416                 if (q->longMsg[i] == 0) {
00417                         WMP_ERROR(stderr,"Unable to allocate Memory (2)\n");
00418                         return;
00419                 }
00420                 memset(q->longMsg[i], 0, sizeof(longMsg_t));
00421 
00422                 q->longMsg[i]->data = MALLOC(max_msg_size);
00423                 q->longMsg[i]->allocated_size = max_msg_size;
00424 
00425                 allocated+=max_msg_size;
00426                 if (q->longMsg[i]->data == 0) {
00427                         WMP_ERROR(stderr,"Unable to allocate Memory (3)\n");
00428                         return;
00429                 }
00430                 memset(q->longMsg[i]->data, 0, max_msg_size);
00431 
00432                 q->longMsg[i]->max_message_parts = ((max_msg_size/512)+1);
00433                 q->longMsg[i]->received_part = (char *) MALLOC(q->longMsg[i]->max_message_parts);
00434                 allocated+=q->longMsg[i]->max_message_parts;
00435                 if (q->longMsg[i]->received_part == 0) {
00436                         WMP_ERROR(stderr,"Unable to allocate Memory (4)\n");
00437                         return;
00438                 }
00439                 memset(q->longMsg[i]->received_part, 0, q->longMsg[i]->max_message_parts);
00440 
00441         }
00442 
00443         q->sems = (SEM_T **) MALLOC(num_ports * sizeof(SEM_T *));
00444         q->mtxs = (MUTEX **) MALLOC(num_ports * sizeof(MUTEX *));
00445         allocated+=num_ports * (sizeof(SEM_T *) +  sizeof(MUTEX *));
00446         if (q->sems == 0 || q->mtxs == 0) {
00447                 WMP_ERROR(stderr,"Unable to allocate Memory (5)\n");
00448                 return;
00449         }
00450 
00451         for (i = 0; i < num_ports; i++) {
00452                 q->sems[i] = (SEM_T *) MALLOC(sizeof(SEM_T));
00453                 q->mtxs[i] = (MUTEX *) MALLOC(sizeof(MUTEX));
00454                 allocated+=sizeof(SEM_T) + sizeof(MUTEX);
00455                 if (q->sems[i] == 0 || q->mtxs[i] == 0) {
00456                         WMP_ERROR(stderr,"Unable to allocate Memory (6)\n");
00457                         return;
00458                 }
00459                 SEM_INIT(q->sems[i], 0, 0);
00460                 MUTEX_INIT(q->mtxs[i]);
00461         }
00462         MUTEX_INIT(&q->mtx);
00463         MUTEX_INIT(&q->uniq_mtx);
00464 
00465         WMP_MSG(stderr,"*** Queues loaded, allocated %d kbytes\n", allocated/1024);
00466 }
00467 
00468 void queue_rx_free(queue_t * q) {
00469         int i;
00470         for (i = 0; i < q->max_msg_num; i++) {
00471                         FREE(q->longMsg[i]->data);
00472                         FREE(q->longMsg[i]->received_part);
00473                         FREE(q->longMsg[i]);
00474         }
00475 
00476         FREE(q->longMsg);
00477         for (i = 0; i < q->num_ports; i++) {
00478                 FREE(q->sems[i]);
00479                 FREE(q->mtxs[i]);
00480         }
00481         FREE(q->sems);
00482         FREE(q->mtxs);
00483 }
00484 
00485 int queue_rx_get_count(queue_t * q, int port) {
00486         if (port >= q->num_ports || port < 0) {
00487                 WMP_ERROR(stderr,"Warning, asking queue_rx_get_count on port %d while only %d\n", port, q->num_ports);
00488                 return 0;
00489         } else {
00490                 return SEM_GET_COUNT(*q->sems[port]);
00491         }
00492 }
00493 int queue_rx_get_room(queue_t * q){
00494         int i, cnt = 0;
00495         exclusive_on(q);
00496     for (i = 0; i < q->max_msg_num; i++) {
00497         if (q->longMsg[i]->hash == 0){
00498                 cnt++;
00499         }
00500     }
00501     exclusive_off(q);
00502     return cnt;
00503 }


ros_rt_wmp
Author(s): Danilo Tardioli, dantard@unizar.es
autogenerated on Fri Jan 3 2014 12:07:55