queue_tx.c
Go to the documentation of this file.
00001 /*------------------------------------------------------------------------
00002  *---------------------           RT-WMP              --------------------
00003  *------------------------------------------------------------------------
00004  *                                                         V7.0B  11/05/10
00005  *
00006  *
00007  *  File: ./src/plugins/long_messages/long_messages.c
00008  *  Authors: Danilo Tardioli
00009  *  ----------------------------------------------------------------------
00010  *  Copyright (C) 2000-2010, Universidad de Zaragoza, SPAIN
00011  *
00012  *  Contact Addresses: Danilo Tardioli                   dantard@unizar.es
00013  *
00014  *  RT-WMP is free software; you can  redistribute it and/or  modify it
00015  *  under the terms of the GNU General Public License  as published by the
00016  *  Free Software Foundation;  either  version 2, or (at  your option) any
00017  *  later version.
00018  *
00019  *  RT-WMP  is distributed  in the  hope  that  it will be   useful, but
00020  *  WITHOUT  ANY  WARRANTY;     without  even the   implied   warranty  of
00021  *  MERCHANTABILITY  or  FITNESS FOR A  PARTICULAR PURPOSE.    See the GNU
00022  *  General Public License for more details.
00023  *
00024  *  You should have received  a  copy of  the  GNU General Public  License
00025  *  distributed with RT-WMP;  see file COPYING.   If not,  write to the
00026  *  Free Software  Foundation,  59 Temple Place  -  Suite 330,  Boston, MA
00027  *  02111-1307, USA.
00028  *
00029  *  As a  special exception, if you  link this  unit  with other  files to
00030  *  produce an   executable,   this unit  does  not  by  itself cause  the
00031  *  resulting executable to be covered by the  GNU General Public License.
00032  *  This exception does  not however invalidate  any other reasons why the
00033  *  executable file might be covered by the GNU Public License.
00034  *
00035  *----------------------------------------------------------------------*/
00036 #include <stdarg.h>
00037 #include "config/compiler.h"
00038 #include "core/interface/wmp_interface.h"
00039 #include "core/include/global.h"
00040 #include "core/include/frames.h"
00041 #include "include/queue_core.h"
00042 #include "core/include/queues.h"
00043 #include "core/include/wmp_misc.h"
00044 #include "core/include/queue_core.h"
00045 
00046 #ifdef USE_LONG_MESSAGE_COMPRESS
00047 #include <zlib.h>
00048 static Bytef * zd;
00049 static uLongf zlen;
00050 #endif
00051 
00052 /*  ****  LONG MESSAGES */
00053 
00054 static int look_tx(queue_t * q) {
00055         int i, maxPri = -1, maxPriId = -1;
00056         long long age, older_age = 0;
00057         for (i = 0; i < q->max_msg_num; i++) {
00058                 if (q->longMsg[i]->done) {
00059                         age = getRawActualTimeus() - q->longMsg[i]->ts;
00060                         if (q->longMsg[i]->priority > maxPri || (q->longMsg[i]->priority
00061                                         == maxPri && age > older_age)) {
00062                                 maxPri = q->longMsg[i]->priority;
00063                                 maxPriId = i;
00064                                 older_age = age;
00065                         }
00066                 }
00067         }
00068         return maxPriId;
00069 }
00070 
00071 static int extend_size_if_necessary(queue_t *q, int idx, int size){
00072         if (size > q->longMsg[idx]->allocated_size){
00073                 char * dp;
00074                 WMP_ERROR(stderr, "Not enough space in element %d (%d bytes), allocating %d bytes\n",idx,q->longMsg[idx]->allocated_size,size);
00075                 dp = MALLOC(size);
00076                 if (dp != 0){
00077                         FREE(q->longMsg[idx]->data);
00078                         q->longMsg[idx]->data = dp;
00079                         q->longMsg[idx]->allocated_size = size;
00080                 }else{
00081                         WMP_ERROR(stderr, "Error: Unable to allocate memory (queue_push_part), discarding message");
00082                         q->longMsg[idx]->hash = 0;
00083                         return 0;
00084                 }
00085         }
00086         return 1;
00087 }
00088 
00089 
00090 int queue_tx_push_data(queue_t * q, unsigned int port, char * p, unsigned int size,
00091                 unsigned int dest, signed char priority) {
00092         int i, nparts, must_signal = 0;
00093         int a = wmpGetNodeId();
00094         unsigned short ts = (unsigned short) (getRawActualTimeus() & 0x7F);
00095 
00096         if (q->drop_next){
00097                 q->drop_next = 0;
00098                 return 1;
00099         }
00100         exclusive_on(q);
00101 
00102         a = a << 11;
00103         ts = ts << 5;
00104 
00105         q->hash_idx++;
00106         if (q->hash_idx > 14) {
00107                 q->hash_idx = 1;
00108         }
00109 
00110         nparts = size / q->message_part_size;
00111         if (size % q->message_part_size != 0){
00112                 nparts++;       
00113         }
00114 
00115         for (i = 0; i < q->max_msg_num; i++) {
00116                 if (q->longMsg[i]->hash == 0) {
00117 
00118                         if (!extend_size_if_necessary(q, i, size)) {
00119                                 exclusive_off(q);
00120                                 return 0;
00121                         }
00122 
00123                         q->longMsg[i]->size = size;
00124                         q->longMsg[i]->dest = dest;
00125                         q->longMsg[i]->port = port;
00126                         q->longMsg[i]->priority = priority;
00127                         q->longMsg[i]->hash = a + ts +q->hash_idx;
00128                         //fprintf(stderr,"TX Hash:%d : %d, %d, %d\n", q->longMsg[i]->hash,a,ts,q->hash_idx);
00129 
00130                         q->longMsg[i]->ts = getRawActualTimeus();
00131                         q->longMsg[i]->msg_part_size = q->message_part_size;
00132                         q->longMsg[i]->num_parts = nparts;
00133                         q->longMsg[i]->parts_pointer = 0;
00134                         q->longMsg[i]->this_part_size = 0;
00135 
00136                         /* Force Burst */
00137                         if (q->force_burst == port) {
00138                                 q->longMsg[i]->burst_hash = a + 2047;
00139                         } else {
00140                                 q->longMsg[i]->burst_hash = q->longMsg[i]->hash;
00141                         }
00142 
00143                         memcpy(q->longMsg[i]->data, p, size);
00144                         q->elem++;
00145                         q->longMsg[i]->done = 1;
00146                         must_signal = 1;
00147                         break;
00148                 }
00149         }
00150         exclusive_off(q);
00151         if (must_signal) {
00152                 for (i = 0; i < nparts; i++) {
00153                         SIGNAL(q->sem);
00154                 }
00155                 return 1;
00156         }else{
00157                 return 0;
00158         }
00159 }
00160 void queue_tx_force_burst(queue_t * q, int port) {
00161         q->force_burst = port;
00162 }
00163 
00164 int queue_tx_get_count(queue_t * q) {
00165         return SEM_GET_COUNT(q->sem);
00166 }
00167 
00168 int queue_tx_get_port_period(queue_t * q) {
00169         return SEM_GET_COUNT(q->sem);
00170 }
00171 
00172 int queue_tx_pop_part(queue_t * q, longMsg_t ** p) {
00173         int id, ret = 0;
00174 
00175         ret = WAIT_TIMED(q->sem,5000);
00176 
00177         exclusive_on(q);
00178         if (ret == 0) {
00179                 WMP_LM_DEBUG(stderr,"Lock pop data (TX)\n");
00180                 id = look_tx(q);
00181                 if (p!=0 && id >= 0 && id < q->max_msg_num) {
00182 
00183                         /* Fill pointer */
00184                         q->longMsg[id]->pointer = q->longMsg[id]->data + (q->longMsg[id]->parts_pointer * q->message_part_size);
00185 
00186                         /* Fill this_part_size */
00187                         if (q->longMsg[id]->num_parts == 1){
00188                                 q->longMsg[id]->this_part_size = q->longMsg[id]->size;
00189                         }else{
00190                                 if (q->longMsg[id]->parts_pointer == (q->longMsg[id]->num_parts -1)){
00191                                         q->longMsg[id]->this_part_size = q->longMsg[id]->size - (q->message_part_size * q->longMsg[id]->parts_pointer) ;
00192                                 }else{
00193                                         q->longMsg[id]->this_part_size = q->message_part_size;
00194                                 }
00195                         }
00196 
00197                         /* Fill part_id */
00198                         if (q->longMsg[id]->parts_pointer == 0){
00199                                 q->longMsg[id]->part_id = - q->longMsg[id]->num_parts;
00200                         }else{
00201                                 q->longMsg[id]->part_id = q->longMsg[id]->parts_pointer;
00202                         }
00203                         *p = q->longMsg[id];
00204                         q->last_popped_id = id;
00205                 }
00206                 return id;
00207         } else {
00208                 return -1;
00209         }
00210 }
00211 
00212 int queue_tx_reschedule(queue_t * q) {
00213         if (q->last_popped_id >= 0 && q->last_popped_id < q->max_msg_num) {
00214                 exclusive_on(q);
00215                 q->longMsg[q->last_popped_id]->rescheduled = 1;
00216                 exclusive_off(q);
00217                 SIGNAL(q->sem);
00218                 return 1;
00219         }
00220         return 0;
00221 }
00222 
00223 int queue_tx_confirm(queue_t * q) {
00224         if (q->last_popped_id >= 0 && q->last_popped_id < q->max_msg_num) {
00225                 exclusive_on(q);
00226                 q->longMsg[q->last_popped_id]->rescheduled = 0;
00227                 q->longMsg[q->last_popped_id]->parts_pointer++;
00228                 if (q->longMsg[q->last_popped_id]->parts_pointer
00229                                 == q->longMsg[q->last_popped_id]->num_parts) {
00230                         clear(q, q->last_popped_id);
00231                         q->elem--;
00232                 }
00233                 exclusive_off(q);
00234                 return 1;
00235         }
00236         return 0;
00237 }
00238 
00239 void queue_tx_get_last_popped_info(queue_t * q, int * age, int *port, int * priority){
00240         if (q->last_popped_id >= 0 && q->last_popped_id < q->max_msg_num){
00241                 *age = ((int) (getRawActualTimeus() - q->longMsg[q->last_popped_id]->ts))/1000;
00242                 *port =  q->longMsg[q->last_popped_id]->port;
00243                 *priority =  q->longMsg[q->last_popped_id]->priority;
00244         }
00245 }
00246 
00247 int queue_tx_inspect_head(queue_t * q, longMsg_t ** p) {
00248         int maxPriId = look_tx(q);
00249         if (maxPriId > 0) {
00250                 if (p != 0) {
00251                         *p = q->longMsg[maxPriId];
00252                 }
00253                 return (int) (getRawActualTimeus - q->longMsg[maxPriId]->ts);
00254         } else {
00255                 return -1;
00256         }
00257 }
00258 
00259 int queue_tx_get_head_dest(queue_t * q){
00260         int maxPriId = look_tx(q);
00261         if (maxPriId>=0){
00262                 return q->longMsg[maxPriId]->dest;
00263         }else{
00264                 return -1;
00265         }
00266 }
00267 int queue_tx_get_head_age(queue_t * q){
00268         int maxPriId = look_tx(q);
00269         if (maxPriId>=0){
00270                 return (int) (getRawActualTimeus() - q->longMsg[maxPriId]->ts);
00271         }else{
00272                 return -1;
00273         }
00274 }
00275 
00276 void queue_tx_pop_part_done(queue_t * q, int maxPriId) {
00277         exclusive_off(q);
00278 }
00279 
00280 int queue_tx_remove_head(queue_t * q){
00281         int id = queue_tx_pop_part(q,0);
00282         queue_tx_pop_part_done(q,id);
00283         return id;
00284 }
00285 
00286 void queue_tx_drop_next(queue_t * q){
00287         q->drop_next = 1;
00288 }
00289 
00290 int queue_tx_get_head_id(queue_t * q){
00291         int maxPriId = look_tx(q);
00292         return maxPriId;
00293 }
00294 void queue_tx_drop_elem(queue_t * q, int id){
00295         exclusive_on(q);
00296         clear(q,id);
00297         if (queue_tx_get_count(q)>0){
00298                 WAIT(q->sem);
00299         }
00300         exclusive_off(q);
00301 }
00302 
00303 int wmp_queue_tx_get_head_dest(void);
00304 
00305 void queue_tx_init(queue_t * q, int max_msg_size, int max_msg_num, int msg_part_size) {
00306 
00307         int i, allocated = 0;
00308         q->max_msg_num = max_msg_num;
00309         q->max_msg_size = max_msg_size;
00310         q->message_part_size = msg_part_size;
00311         q->longMsg = (longMsg_t **) MALLOC(max_msg_num * sizeof(longMsg_t *));
00312         allocated += max_msg_num * sizeof(longMsg_t *);
00313         if (q->longMsg == 0) {
00314                 WMP_ERROR(stderr,"Unable to allocate Memory (1)\n");
00315                 return;
00316         }
00317 
00318         for (i = 0; i < max_msg_num; i++) {
00319                 q->longMsg[i] = (longMsg_t *) MALLOC(sizeof(longMsg_t));
00320                 allocated += sizeof(longMsg_t);
00321                 if (q->longMsg[i] == 0) {
00322                         WMP_ERROR(stderr,"Unable to allocate Memory (2)\n");
00323                         return;
00324                 }
00325                 memset(q->longMsg[i], 0, sizeof(longMsg_t));
00326 
00327                 q->longMsg[i]->data = MALLOC(max_msg_size);
00328                 allocated += max_msg_size;
00329                 if (q->longMsg[i]->data == 0) {
00330                         WMP_ERROR(stderr,"Unable to allocate Memory (3)\n");
00331                         return;
00332                 }
00333                 memset(q->longMsg[i]->data, 0, max_msg_size);
00334                 q->longMsg[i]->allocated_size = max_msg_size;
00335                 q->force_burst = -1;
00336         }
00337 
00338         SEM_INIT(&q->sem, 0, 0);
00339         MUTEX_INIT(&q->uniq_mtx);
00340 
00341         WMP_MSG(stderr,"*** Queues loaded, allocated %d kbytes\n", allocated/1024);
00342 }
00343 
00344 void queue_tx_free(queue_t * q) {
00345         int i;
00346         for (i = 0; i < q->max_msg_num; i++) {
00347                 FREE(q->longMsg[i]->data);
00348                 FREE(q->longMsg[i]);
00349         }
00350         FREE(q->longMsg);
00351 
00352 #ifdef USE_LONG_MESSAGE_COMPRESS
00353         free(zd);
00354 #endif
00355 
00356 }


ros_rt_wmp
Author(s): Danilo Tardioli, dantard@unizar.es
autogenerated on Mon Oct 6 2014 08:27:10