Wiselib
wiselib.testing/radio/reliable/reliableradio.h
Go to the documentation of this file.
00001 /*
00002  * File:   reliableradio.h
00003  * Author: amaxilatis
00004  *
00005  * Created on July 27, 2010
00006  */
00007 
00008 #ifndef _RELIABLERADIO_H
00009 #define  _RELIABLERADIO_H
00010 
00011 //#include "util/delegates/delegate.hpp"
00012 #include "util/pstl/vector_static.h"
00013 #include "util/pstl/pair.h"
00014 #include "util/delegates/delegate.hpp"
00015 #include "util/base_classes/radio_base.h"
00016 #include "reliablemsg.h"
00017 
00018 //#include "communication/reliablemsg.h"
00019 
00020     /*
00021      * DEBUG MESSAGES TEMPLATE
00022      * ReliableRadio::<task> [ type= ...] 
00023      *
00024      * */
00025 //#define DEBUG_RELIABLERADIO
00026 //#define EXTRA
00027 #ifdef ISENSE_RADIO_ADDR_TYPE
00028     #define ISENSE_APP
00029 #endif
00030 
00031 #define MAX_PENDING 10  // Maximum number of pending for delivery messages
00032 #define TIMEOUT 800     // Timeout for time to pass
00033 #define MAX_CONNECTIONS 20
00034 #define MAX_RECV 15
00035 
00036 namespace wiselib {
00037 
00038 /*
00039  * ReliableRadio Template
00040  * Uses OsModel, Radio and Timer
00041  * Degug is only for debugging
00042  * ReliableRadio is used as a layer between normal radio
00043  * and application to make sure that messages will be
00044  * delivered besides any errors that may occur.
00045  * */
00046 template<typename OsModel_P,
00047          typename Radio_P,
00048          typename Timer_P,
00049          typename Debug_P>
00050 class ReliableRadio
00051     : public RadioBase<OsModel_P,
00052                        typename Radio_P::node_id_t,
00053                        typename Radio_P::size_t,
00054                        typename Radio_P::block_data_t>
00055 {
00056 
00057 public:
00058     // Type definitions
00059     typedef OsModel_P OsModel;
00060 
00061     typedef Radio_P Radio;
00062     typedef typename Radio::node_id_t node_id_t;
00063     typedef typename Radio::size_t size_t;
00064     typedef typename Radio::block_data_t block_data_t;
00065 
00066     typedef typename Radio::message_id_t message_id_t;
00067     typedef ReliableMsg< OsModel,Radio> ReliableMessage; // a Reliable message with some extra headers for delivery checks
00068 
00069     typedef delegate4<void, uint8_t, node_id_t, size_t, block_data_t*> event_notifier_delegate_t;
00070 
00071     struct information {
00072         uint16_t seq_no;
00073         node_id_t destination;
00074         int timestamp;
00075         int retries;
00076         event_notifier_delegate_t event_notifier_callback;
00077     }; // stores information about the status of a sent message
00078 
00079     typedef struct information information_t;
00080     typedef Debug_P Debug;
00081     typedef Timer_P Timer;
00082 
00083     // Vector Storing all messages sent but receive not confirmed yet
00084     typedef wiselib::vector_static<OsModel,
00085     pair<information, ReliableMessage>, MAX_PENDING> vector_t;
00086 
00087     typedef ReliableRadio<OsModel_P, Radio_P, Timer_P, Debug_P> self_t;
00088 
00089 
00090     // --------------------------------------------------------------------
00091 
00092     enum Events {
00093         MSG_DROPPED = 1,
00094         MSG_ACK_RCVD = 2
00095     };
00096 
00097     enum SpecialNodeIds {
00098         BROADCAST_ADDRESS = Radio::BROADCAST_ADDRESS, 
00099         NULL_NODE_ID = Radio::NULL_NODE_ID
00101     };
00102     // --------------------------------------------------------------------
00103 
00104     enum Restrictions {
00105 
00106         MAX_MESSAGE_LENGTH = ReliableMessage::MAX_MESSAGE_LENGTH
00108     };
00109     // --------------------------------------------------------------------
00110 
00111     void init (Radio& radio , Timer& timer , Debug& debug){
00112         radio_ = &radio;
00113         timer_ = &timer;
00114         debug_ = &debug;
00115     };
00116 
00117     ReliableRadio() {
00118         MAX_RETRIES = 5;
00119 
00120         acked_messages_=0;
00121     }
00122     ;
00123 
00124     ~ReliableRadio() {
00125 
00126     }
00127     ;
00128 
00129 
00130 
00131 
00132      // Enable the Radio
00133 
00134     void enable_radio() {
00135 #ifdef DEBUG_RELIABLERADIO
00136        debug().debug("ReliableRadio::enable\n");
00137 #endif
00138 
00139 #ifdef ISENSE_APP
00140         debug().debug("Hello isense world\n");
00141 #else
00142         debug().debug("running in shawn\n");
00143 #endif
00144 
00145         //set the local os pointer
00146         //set_os(os_);
00147 
00148         //enable normal radio
00149         //Radio::enable(os());
00150         radio().enable_radio();
00151 
00152         radio().template reg_recv_callback<self_t,
00153                 &self_t::receive > ( this);
00154         // initialize the sequense_numbers to 1 (0 means empty)
00155         sequence_numbers_ = 1;
00156         // initialize the Vector to empty ( seq_no=0 means empty)
00157         for (int i = 0; i < MAX_PENDING; i++) {
00158             pending_messages_[i].first.seq_no = 0;
00159             pending_messages_[i].first.event_notifier_callback = event_notifier_delegate_t();
00160         }
00161         for (int i=0;i < MAX_CONNECTIONS; i++){
00162             open_connections[i].conn_id=NULL_NODE_ID;
00163             open_connections[i].sequence_numbers_=1;
00164             open_connections[i].received_count=0;
00165         }
00166         // timestamps start from 0
00167         current_time_ = 0;
00168         // set timer to change time
00169         timer().template set_timer<self_t, &self_t::time_passes > (
00170                 time_slice_, this, (void*) 0);
00171         // set timer to wake up realiable_deamon ( checks for undelivered messages and resends them )
00172         timer().template set_timer<self_t, &self_t::reliable_daemon > (
00173                 sleep_time_, this, (void*) 0);
00174 
00175     }
00176     ;
00177 
00178     // --------------------------------------------------------------------
00179 
00180     void disable_radio() {
00181                 radio().unreg_recv_callback(recv_callback_id_);
00182     };
00183 
00184     // Send a message the application has requested
00185 //  template<class T, void (T::*TMethod)(uint8_t, node_id_t, size_t, block_data_t*)>
00186   //  void sendX( node_id_t id, uint8_t len, uint8_t *data,T *obj_pnt)
00187   template<class T, void (T::*TMethod)(uint8_t, node_id_t, size_t, block_data_t*)>
00188   void send_callback(node_id_t from, size_t len, block_data_t *data,T *obj_pnt )
00189   {
00190 
00191       int pending_id = 0;
00192       if ( (pending_id = send(from,len,data)) != -1) {
00193           pending_messages_[pending_id].first.event_notifier_callback = event_notifier_delegate_t::template from_method<T, TMethod>( obj_pnt );
00194       }
00195 
00196   };
00197 
00198     int send( node_id_t id, size_t len, block_data_t *data) {
00199         //set_os(os_);
00200         if (id == BROADCAST_ADDRESS) {
00201             // create a message to host the data
00202             ReliableMessage m ;
00203             m.set_msg_id(ReliableMessage::BROADCAST_MESSAGE);
00204             m.set_seq_number(0);
00205             m.set_payload(len, data);
00206             // get the real payload to send
00207 
00208 #ifdef DEBUG_RELIABLERADIO
00209             debug().debug("ReliableRadio::send [ type= %d(broad) size= %d]\n", m.msg_id() , m.buffer_size() );
00210 #endif
00211 
00212             // send a broadcast message
00213             radio().send(id, m.buffer_size(), (uint8_t * ) &m );
00214             return -1;
00215         }
00216             /*
00217              * ADD message to Vector
00218              * CREATE a special message containing the original
00219              * SEND through normal radio
00220              * WAIT for ack or reliability_daemon to resend it
00221              * */
00222         else {
00223             int pending_id = 0; // slot in the reliable vector available for new message
00224             pending_id = add_message(ReliableMessage::RELIABLE_MESSAGE, id, len, data); // add the message to the vector
00225             // If no position available inform APPLICATION
00226             // TODO: Inform application for error
00227             if (pending_id == -1) {
00228 #ifdef DEBUG_RELIABLERADIO
00229                 debug().debug( "ReliableRadio::error [ type= vector_full ]\n");
00230 #endif
00231             }// Get message from the Vector and send it through normal radio
00232             else {
00233 
00234 #ifdef DEBUG_RELIABLERADIO
00235 #ifdef ISENSE_APP
00236                 debug().debug("ReliableRadio::send [ type= %d(reliable) destination= %x seq_no= %d size= %d]", ReliableMessage::RELIABLE_MESSAGE, id, pending_messages_[pending_id].second.msg_id(), pending_messages_[pending_id].second.buffer_size());
00237 #else
00238                 debug().debug("ReliableRadio::send [ type= %d(reliable) destination= %d seq_no= %d size= %d]\n", ReliableMessage::RELIABLE_MESSAGE, id, pending_messages_[pending_id].second.msg_id(), pending_messages_[pending_id].second.buffer_size());
00239 #endif
00240 #endif
00241                 // send message through normal radio
00242                 radio().send(id, pending_messages_[pending_id].second.buffer_size(), (uint8_t *) & pending_messages_[pending_id].second);
00243             }
00244 
00245             return pending_id;
00246         }
00247     }
00248     ;
00249 
00250 
00251 
00252     // Adds a message to the Vector and initializes the information struct conserning it
00253     int add_message(int msg_id, node_id_t destination, uint8_t len, uint8_t * data) {
00254         int conn_id = check_connection(destination);
00255         int pos;
00256         // find a free slot in the pending messages Vector to store the message Else return -1
00257         if ((pos = find_postition()) != -1) {
00258             // Store information about the message
00259             pending_messages_[pos].first.seq_no = open_connections[conn_id].sequence_numbers_++;
00260             pending_messages_[pos].first.destination = destination;
00261             pending_messages_[pos].first.timestamp = current_time_;
00262             pending_messages_[pos].first.retries = 0;
00263             // Store the message and set its headers
00264             pending_messages_[pos].second.set_msg_id(msg_id);
00265             pending_messages_[pos].second.set_seq_number(open_connections[conn_id].sequence_numbers_ - 1);
00266             pending_messages_[pos].second.set_payload(len, data);
00267             if (sequence_numbers_ == 0) {
00268                 sequence_numbers_++;
00269             }
00270             return pos;
00271         }
00272         return -1;
00273     }
00274     ;
00275 
00276     // --------------------------------------------------------------------
00277 
00278 
00279     // Increment time by 1
00280 
00281     void time_passes(void *) {
00282         current_time_++;
00283         //Debug::debug(os(),"Time is now %d\n",current_time_);
00284         timer().template set_timer<self_t, &self_t::time_passes > (
00285                 time_slice_, this, (void*) 0);
00286 
00287     }
00288     ;
00289 
00290     // --------------------------------------------------------------------
00291 
00292     /*
00293      * Reliable daemon
00294      * Check to see if any messages are sent
00295      * but not acked so to send them again.
00296      *
00297      *      check by timestamps
00298      *          if time for ack receive has passed
00299      *      and retries
00300      *          if maximum retries reached abort
00301      *
00302      * Reset timer for daemon
00303      */
00304 
00305     void reliable_daemon(void *) {
00306         bool found = false; // true if a message is to be resent
00307         for (int i = 0; i < MAX_PENDING; i++) { // search the message vector
00308             if (pending_messages_[i].first.seq_no != 0) { // only for valid entries : 0 means no message
00309                 if (pending_messages_[i].first.timestamp < current_time_ - 2) { // 2 time units needed for message delivery
00310                     if (pending_messages_[i].first.retries < MAX_RETRIES) { // if max retries reached
00311                         // a message is to be resend
00312                         pending_messages_[i].first.timestamp = current_time_; // refresh its sent timestamp
00313 
00314                         size_t mess_size = pending_messages_[i].second.buffer_size();
00315 
00316                         mess_size = mess_size;
00317 
00318                         // resend the message
00319                         radio().send(
00320                                 pending_messages_[i].first.destination,
00321                                 pending_messages_[i].second.buffer_size(),
00322                                 (uint8_t *) & pending_messages_[i].second);
00323                         // increase by 1 retries
00324                         pending_messages_[i].first.retries++;
00325 
00326 #ifdef DEBUG_RELIABLERADIO
00327 #ifdef ISENSE_APP
00328                         debug().debug("ReliableRadio::resend [ type= %d(reliable) destination= %x seq_no= %d size= %d]", ReliableMessage::RELIABLE_MESSAGE,
00329                                 pending_messages_[i].first.destination,
00330                                 pending_messages_[i].first.seq_no,
00331                                 mess_size
00332                                 );
00333 #else
00334                         debug().debug("ReliableRadio::resend [ type= %d(reliable) destination= %d seq_no= %d size= %d]\n", ReliableMessage::RELIABLE_MESSAGE,
00335                                 pending_messages_[i].first.destination,
00336                                 pending_messages_[i].first.seq_no,
00337                                 mess_size
00338                                 );
00339 #endif
00340 #endif
00341                     } else { // if max retries reached abort sending
00342 
00343 #ifdef DEBUG_RELIABLERADIO
00344 #ifdef ISENSE_APP
00345 
00346                         debug().debug("ReliableRadio::abort [ type= %d(reliable) destination= %x seq_no= %d  max_retries]", ReliableMessage::RELIABLE_MESSAGE,
00347                                 pending_messages_[i].first.destination,
00348                                 pending_messages_[i].first.seq_no);
00349 #else
00350                         debug().debug("ReliableRadio::abort [ type= %d(reliable) destination= %d seq_no= %d  max_retries]\n", ReliableMessage::RELIABLE_MESSAGE,
00351                                 pending_messages_[i].first.destination,
00352                                 pending_messages_[i].first.seq_no);
00353 #endif
00354 #endif
00355                         if ( pending_messages_[i].first.event_notifier_callback != event_notifier_delegate_t() )
00356                         {
00357                             pending_messages_[i].first.event_notifier_callback(MSG_DROPPED,
00358                                 pending_messages_[i].first.destination,
00359                                 pending_messages_[i].second.buffer_size(),
00360                                 (uint8_t *) & pending_messages_[i].second );
00361                         }
00362 
00363                         // clear vector entry to be reused
00364                         pending_messages_[i].first.event_notifier_callback = event_notifier_delegate_t();
00365                         pending_messages_[i].first.seq_no = 0;
00366                     }
00367                     // a message was found
00368                     found = true;
00369                 }
00370             }
00371 
00372         }
00373 
00374 #ifdef DEBUG_RELIABLERADIO
00375         if (!found) {
00376             //                Debug::debug(os(), "ReliableRadio::info No pending messages\n");
00377         }
00378 #endif
00379         // Reset the timer
00380         timer().template set_timer<self_t, &self_t::reliable_daemon > (
00381                 sleep_time_, this, (void*) 0);
00382     }
00383 
00384     // find a free entry inside the pending messages vector
00385 
00386     int find_postition() {
00387         for (int i = 0; i < MAX_PENDING; i++) {
00388             if (pending_messages_[i].first.seq_no == 0) { // seq_no : 0 means no message
00389                 return i;
00390             } else {
00391             }
00392         }
00393         return -1;
00394     }
00395     ;
00396 
00397     // --------------------------------------------------------------------
00398 
00399     /*
00400      * Callback from the Radio module
00401      *
00402      * when a new message is received check its type:
00403      * - Ack_Message : mark as received all containing sequence numbers
00404      * - Broadcast message : send to the application
00405      * - ReliableMessage : send ack , and forward message to the application
00406      *
00407      *
00408      */
00409     void receive(node_id_t from, size_t len, block_data_t* data) {
00410 
00411 
00412         // do not receive own messages
00413         if (radio().id() == from) {
00414             return;
00415         }
00416         // check for open connections with the sender
00417         int sd = check_connection(from);
00418 
00419         // if a connection can be opened
00420         if (sd != -1) {
00421 
00422             // check the message id
00423             if (data[0] == ReliableMessage::ACK_MESSAGE) { // if an ack message
00424                 ReliableMessage recvack;
00425                 memcpy(&recvack, data, len);
00426 #ifdef DEBUG_RELIABLERADIO
00427 #ifdef ISENSE_APP
00428 
00429                 debug().debug("ReliableRadio::receive [ type= %d(ack) node= %x from= %x payload_size= %d ",
00430                         recvack.msg_id(),
00431                         radio().id(),
00432                         from,
00433                         recvack.payload_size());
00434 #ifndef EXTRA
00435                 debug().debug("]");
00436 #endif
00437 
00438 #else
00439 
00440                 debug().debug("ReliableRadio::receive [ type= %d(ack) node= %d from= %d payload_size= %d ",
00441                         recvack.msg_id(),
00442                         radio().id(),
00443                         from,
00444                         recvack.payload_size());
00445 #ifndef EXTRA
00446                 debug().debug("]\n");
00447 #endif
00448 #endif
00449 #endif
00450 
00451                 uint8_t seq_nos[recvack.payload_size()];
00452                 memcpy(seq_nos, recvack.payload(), recvack.payload_size());
00453                 // get sequence from the message and remove the respective messages from the list
00454                 for (size_t j = 0; j < recvack.payload_size(); j += 2) {
00455                     uint16_t seq_no_acked = seq_nos[j] + seq_nos[j + 1]*256;
00456 #ifdef EXTRA
00457                     debug().debug("ReliableRadio::ack_msg_seq_no %d\n", seq_no_acked);
00458 #endif
00459                     // search the whole vector
00460                     for (int i = 0; i < MAX_PENDING; i++) {
00461                         // if the sequence numbers match
00462                         if (pending_messages_[i].first.seq_no == seq_no_acked) {
00463                             // if the destinations match
00464                             if (pending_messages_[i].first.destination == from) {
00465                                 // remove entry
00466 #ifdef DEBUG_RELIABLERADIO
00467                                 debug().debug("ReliableRadio::receive seq_no_acked=%d\n", seq_no_acked);
00468 #endif
00469                                 if ( pending_messages_[i].first.event_notifier_callback != event_notifier_delegate_t() )
00470                                 {
00471                                     pending_messages_[i].first.event_notifier_callback(MSG_ACK_RCVD,
00472                                             pending_messages_[i].first.destination,
00473                                             pending_messages_[i].second.buffer_size(),
00474                                             (uint8_t *) & pending_messages_[i].second );
00475                                 }
00476 
00477                                 pending_messages_[i].first.event_notifier_callback = event_notifier_delegate_t();
00478                                 pending_messages_[i].first.seq_no = 0;
00479                                 acked_messages_++;
00480                             }
00481                         }
00482                     }
00483                 }
00484 #ifdef EXTRA
00485                 debug().debug("]\n");
00486 #endif
00487             } else if (data[0] == ReliableMessage::RELIABLE_MESSAGE) { // if a reliable message was received
00488 
00489                 ReliableMessage recvm;
00490 
00491                 memcpy(&recvm, data, len);
00492 
00493                 // get sequence number from the message
00494                 uint16_t curr_seq_no = recvm.seq_number();
00495                 // check if the message exists in cache
00496                 bool first_time = was_received(sd, curr_seq_no);
00497                 // if it does not
00498                 if (first_time) {
00499 #ifdef DEBUG_RELIABLERADIO
00500 #ifdef ISENSE_APP
00501                     debug().debug("ReliableRadio::receive [ type= %d(reliable) node= %x from= %x seq_no= %d ]",
00502                             data[0],
00503                             radio().id(),
00504                             from,
00505                             curr_seq_no);
00506 #else
00507                     debug().debug("ReliableRadio::receive [ type= %d(reliable) node= %d from= %d seq_no= %d ]\n",
00508                             data[0],
00509                             radio().id(),
00510                             from,
00511                             curr_seq_no);
00512 #endif
00513 #endif
00514                     // extract the real payload from the message
00515 
00516                     block_data_t msg_striped[recvm.payload_size()];
00517 
00518                     memcpy(msg_striped, recvm.payload(), recvm.payload_size());
00519 
00520                     // forward the payload to the application
00521                     //message_received_callback_(from, recvm.payload_size(), msg_striped);
00522                     notify_receivers(from, recvm.payload_size(), msg_striped);
00523 
00524 
00525                     // create the new ack message to send
00526 
00527                     ReliableMessage ackm;
00528 
00529                     ackm.set_msg_id(ReliableMessage::ACK_MESSAGE);
00530 
00531 
00532                     ackm.set_seq_number(0);
00533                     set_ack_list(&ackm, sd);
00534 
00535                     // send the ack message
00536                     radio().send(from, ackm.buffer_size(), (uint8_t *) & ackm);
00537 #ifdef DEBUG_RELIABLERADIO
00538 #ifdef ISENSE_APP
00539                     debug().debug("ReliableRadio::send [ type= %d(ack) size= %d destination= %x ",
00540                             ackm.msg_id(),
00541                             ackm.buffer_size(),
00542                             from);
00543 #else
00544                     debug().debug("ReliableRadio::send [ type= %d(ack) size= %d destination= %d ",
00545                             ackm.msg_id(),
00546                             ackm.buffer_size(),
00547                             from);
00548 #endif
00549 #endif
00550 #ifdef EXTRA
00551                     block_data_t ack[ackm.payload_size()];
00552                     memcpy(ack,&ackm,ackm.payload_size());
00553                     for (int i = 0; i < ackm.payload_size(); i+=2) {
00554                         debug().debug(" %d", ack[i] + ack[i+1]);
00555 
00556                     }
00557 #endif
00558 #ifdef DEBUG_RELIABLERADIO
00559                     debug().debug("]\n");
00560 #endif
00561                 } else {// if the sequence number exists in cache then just send an ack message ( message was forwarded before to the application
00562 
00563                     // create the ack message
00564                     ReliableMessage ackm;
00565 
00566                     ackm.set_msg_id(ReliableMessage::ACK_MESSAGE);
00567 
00568                     //ackm.set_payload(open_connections[sd].received_count*2);
00569 
00570                     set_ack_list(&ackm, sd);
00571                     // send the ack message
00572                     radio().send(from, ackm.buffer_size(), (uint8_t *) & ackm);
00573                 }
00574             } else if (data[0] == ReliableMessage::BROADCAST_MESSAGE) { // if a broadcast message was received forward to the application
00575 #ifdef DEBUG_RELIABLERADIO
00576 #ifdef ISENSE_APP
00577                 debug().debug("ReliableRadio::receive [ type= %d(broad) from= %x ]", data[0], from);
00578 #else
00579                 debug().debug("ReliableRadio::receive [ type= %d(broad) from= %d ]\n", data[0], from);
00580 #endif
00581 #endif
00582 
00583                 ReliableMessage recvm;
00584 
00585                 memcpy(&recvm, data, len);
00586 
00587                 // extract payload from the message received
00588                 block_data_t msg_recv[recvm.payload_size()];
00589                 memcpy(msg_recv, recvm.payload(), recvm.payload_size());
00590 
00591                 // forward payload to the application
00592                 //message_received_callback_(from, recvm.payload_size(), msg_recv);
00593                 notify_receivers(from, recvm.payload_size(), msg_recv);
00594 
00595             } else {
00596 #ifdef DEBUG_RELIABLERADIO
00597 #ifdef ISENSE_APP
00598                 debug().debug("ReliableRadio::error Unsupported message Received from %x", from);
00599 #else
00600                 debug().debug("ReliableRadio::error Unsupported message Received from %d\n", from);
00601 #endif
00602 #endif
00603             }
00604         }
00605     }
00606     ;
00607 
00608     void set_ack_list(ReliableMessage * m, int sd) {
00609 
00610         uint8_t seq_nos[open_connections[sd].received_count * 2];
00611         for (int i = 0; i < open_connections[sd].received_count; i++) {
00612             seq_nos[2 * i] = open_connections[sd].received[i] % 256;
00613             seq_nos[2 * i + 1] = open_connections[sd].received[i] / 256;
00614 
00615         }
00616         m->set_payload(open_connections[sd].received_count * 2, seq_nos);
00617 
00618     }
00619 
00620     // --------------------------------------------------------------------
00621 
00622     // Check if a connection with the sender was established before
00623 
00624     int check_connection(node_id_t sender) {
00625         int avail = -1;
00626         for (int i = 0; i < MAX_CONNECTIONS; i++) {
00627 
00628             if (open_connections[i].conn_id == sender) {
00629                 return i;
00630             } else if (open_connections[i].conn_id == NULL_NODE_ID) {
00631                 if (avail == -1) {
00632                     avail = i;
00633                 }
00634             }
00635         }
00636         // if the first connection get a new entry from the table
00637         if (avail != -1) {
00638 #ifdef DEBUG_RELIABLERADIO
00639 #ifdef ISENSE_APP
00640             debug().debug("ReliableRadio::connection Opened Connection %x to node %x", radio().id(), sender);
00641 #else
00642             debug().debug("ReliableRadio::connection Opened Connection %d to node %d\n", radio().id(), sender);
00643 #endif
00644 #endif
00645             open_connections[avail].conn_id = sender;
00646             open_connections[avail].max_received = 0;
00647             open_connections[avail].received_count = 0;
00648             for (int i = 0; i < MAX_RECV; i++) {
00649                 open_connections[avail].received[i] = 0;
00650             }
00651         }
00652         // return the connection id
00653         return avail;
00654     };
00655 
00656     // --------------------------------------------------------------------
00657 
00658     // check sequence numbers cache to see if the message with this sequence number was previously received
00659 
00660     bool was_received(int sd, uint16_t seq_no) {
00661         bool first_time = true; // true if the first time received
00662         for (int i = 0; i < open_connections[sd].received_count; i++) {
00663             if (open_connections[sd].received[i] == seq_no) first_time = false;
00664         }
00665         // add to message cache
00666         if (open_connections[sd].received_count < MAX_RECV) {
00667             open_connections[sd].received_count++;
00668             open_connections[sd].received[open_connections[sd].received_count - 1] = seq_no;
00669         } else {
00670             // shift all messages to the past and add seq_number as the latest
00671             for (int i = 0; i < MAX_RECV - 1; i++) {
00672                 open_connections[sd].received[i] = open_connections[sd].received[i + 1];
00673             }
00674             open_connections[sd].received[MAX_RECV - 1] = seq_no;
00675         }
00676 #ifdef EXTRA
00677         debug().debug("RECEIVED[ ");
00678         for (int i = 0; i < MAX_RECV; i++) {
00679             debug().debug(" %d", open_connections[sd].received[i]);
00680         }
00681         debug().debug("]\n");
00682 #endif
00683 
00684         return first_time;
00685     }
00686     ;
00687 
00688     void set_max_retries(int max_retries) {
00689         MAX_RETRIES = max_retries;
00690         debug().debug("ReliableRadio::set max_retries= %d\n", max_retries);
00691     };
00692 
00693     int max_retries() {
00694         return MAX_RETRIES;
00695     };
00696 
00697     int acked_messages() {
00698         return acked_messages_;
00699     };
00700 
00701     // --------------------------------------------------------------------
00702     // returns the node's id
00703 
00704     node_id_t id() {
00705         return radio().id();
00706     };
00707     // --------------------------------------------------------------------
00708 
00709 
00710     /*
00711             Os * os(){
00712                 return os_;
00713             }
00714             void set_os( Os * os ){
00715                 os_=os;
00716             };
00717      */
00718 private:
00719     int recv_callback_id_; //
00720     //radio_delegate_t message_received_callback_;
00721     //Os * os_; // os pointer
00722     uint16_t sequence_numbers_; //
00723     uint16_t current_time_; //
00724     vector_t pending_messages_;
00725     static const int time_slice_ = 200; // time_passes delay
00726     static const int sleep_time_ = 1000; // reliable_daemon's sleep time
00727 
00728     struct connections {
00729         node_id_t conn_id;
00730         uint16_t received[MAX_RECV];
00731         int received_count;
00732         uint16_t max_received;
00733         uint16_t sequence_numbers_;
00734     } open_connections[MAX_CONNECTIONS];
00735 
00736     int MAX_RETRIES; // Maximum retries to deliver a message before abort
00737 
00738 
00739     //debuging
00740     int acked_messages_;
00741 
00742     Radio * radio_;
00743     Timer * timer_;
00744     Debug * debug_;
00745 
00746     Radio& radio() {
00747         return *radio_;
00748     }
00749 
00750     Timer& timer() {
00751         return *timer_;
00752     }
00753 
00754     Debug& debug() {
00755         return *debug_;
00756     }
00757 };
00758 
00759 }
00760 
00761 
00762 #endif   /* _RELIABLERADIO_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines