Wiselib
|
00001 /* 00002 * File: reliableradio.h 00003 * Author: amaxilatis 00004 * 00005 * Created on July 27, 2010 00006 */ 00007 00008 #ifndef _RELIABLERADIO_H 00009 #define _RELIABLERADIO_H 00010 00011 //#include "util/delegates/delegate.hpp" 00012 #include "util/pstl/vector_static.h" 00013 #include "util/pstl/pair.h" 00014 #include "util/delegates/delegate.hpp" 00015 #include "util/base_classes/radio_base.h" 00016 #include "reliablemsg.h" 00017 00018 //#include "communication/reliablemsg.h" 00019 00020 /* 00021 * DEBUG MESSAGES TEMPLATE 00022 * ReliableRadio::<task> [ type= ...] 00023 * 00024 * */ 00025 //#define DEBUG_RELIABLERADIO 00026 //#define EXTRA 00027 #ifdef ISENSE_RADIO_ADDR_TYPE 00028 #define ISENSE_APP 00029 #endif 00030 00031 #define MAX_PENDING 10 // Maximum number of pending for delivery messages 00032 #define TIMEOUT 800 // Timeout for time to pass 00033 #define MAX_CONNECTIONS 20 00034 #define MAX_RECV 15 00035 00036 namespace wiselib { 00037 00038 /* 00039 * ReliableRadio Template 00040 * Uses OsModel, Radio and Timer 00041 * Degug is only for debugging 00042 * ReliableRadio is used as a layer between normal radio 00043 * and application to make sure that messages will be 00044 * delivered besides any errors that may occur. 00045 * */ 00046 template<typename OsModel_P, 00047 typename Radio_P, 00048 typename Timer_P, 00049 typename Debug_P> 00050 class ReliableRadio 00051 : public RadioBase<OsModel_P, 00052 typename Radio_P::node_id_t, 00053 typename Radio_P::size_t, 00054 typename Radio_P::block_data_t> 00055 { 00056 00057 public: 00058 // Type definitions 00059 typedef OsModel_P OsModel; 00060 00061 typedef Radio_P Radio; 00062 typedef typename Radio::node_id_t node_id_t; 00063 typedef typename Radio::size_t size_t; 00064 typedef typename Radio::block_data_t block_data_t; 00065 00066 typedef typename Radio::message_id_t message_id_t; 00067 typedef ReliableMsg< OsModel,Radio> ReliableMessage; // a Reliable message with some extra headers for delivery checks 00068 00069 typedef delegate4<void, uint8_t, node_id_t, size_t, block_data_t*> event_notifier_delegate_t; 00070 00071 struct information { 00072 uint16_t seq_no; 00073 node_id_t destination; 00074 int timestamp; 00075 int retries; 00076 event_notifier_delegate_t event_notifier_callback; 00077 }; // stores information about the status of a sent message 00078 00079 typedef struct information information_t; 00080 typedef Debug_P Debug; 00081 typedef Timer_P Timer; 00082 00083 // Vector Storing all messages sent but receive not confirmed yet 00084 typedef wiselib::vector_static<OsModel, 00085 pair<information, ReliableMessage>, MAX_PENDING> vector_t; 00086 00087 typedef ReliableRadio<OsModel_P, Radio_P, Timer_P, Debug_P> self_t; 00088 00089 00090 // -------------------------------------------------------------------- 00091 00092 enum Events { 00093 MSG_DROPPED = 1, 00094 MSG_ACK_RCVD = 2 00095 }; 00096 00097 enum SpecialNodeIds { 00098 BROADCAST_ADDRESS = Radio::BROADCAST_ADDRESS, 00099 NULL_NODE_ID = Radio::NULL_NODE_ID 00101 }; 00102 // -------------------------------------------------------------------- 00103 00104 enum Restrictions { 00105 00106 MAX_MESSAGE_LENGTH = ReliableMessage::MAX_MESSAGE_LENGTH 00108 }; 00109 // -------------------------------------------------------------------- 00110 00111 void init (Radio& radio , Timer& timer , Debug& debug){ 00112 radio_ = &radio; 00113 timer_ = &timer; 00114 debug_ = &debug; 00115 }; 00116 00117 ReliableRadio() { 00118 MAX_RETRIES = 5; 00119 00120 acked_messages_=0; 00121 } 00122 ; 00123 00124 ~ReliableRadio() { 00125 00126 } 00127 ; 00128 00129 00130 00131 00132 // Enable the Radio 00133 00134 void enable_radio() { 00135 #ifdef DEBUG_RELIABLERADIO 00136 debug().debug("ReliableRadio::enable\n"); 00137 #endif 00138 00139 #ifdef ISENSE_APP 00140 debug().debug("Hello isense world\n"); 00141 #else 00142 debug().debug("running in shawn\n"); 00143 #endif 00144 00145 //set the local os pointer 00146 //set_os(os_); 00147 00148 //enable normal radio 00149 //Radio::enable(os()); 00150 radio().enable_radio(); 00151 00152 radio().template reg_recv_callback<self_t, 00153 &self_t::receive > ( this); 00154 // initialize the sequense_numbers to 1 (0 means empty) 00155 sequence_numbers_ = 1; 00156 // initialize the Vector to empty ( seq_no=0 means empty) 00157 for (int i = 0; i < MAX_PENDING; i++) { 00158 pending_messages_[i].first.seq_no = 0; 00159 pending_messages_[i].first.event_notifier_callback = event_notifier_delegate_t(); 00160 } 00161 for (int i=0;i < MAX_CONNECTIONS; i++){ 00162 open_connections[i].conn_id=NULL_NODE_ID; 00163 open_connections[i].sequence_numbers_=1; 00164 open_connections[i].received_count=0; 00165 } 00166 // timestamps start from 0 00167 current_time_ = 0; 00168 // set timer to change time 00169 timer().template set_timer<self_t, &self_t::time_passes > ( 00170 time_slice_, this, (void*) 0); 00171 // set timer to wake up realiable_deamon ( checks for undelivered messages and resends them ) 00172 timer().template set_timer<self_t, &self_t::reliable_daemon > ( 00173 sleep_time_, this, (void*) 0); 00174 00175 } 00176 ; 00177 00178 // -------------------------------------------------------------------- 00179 00180 void disable_radio() { 00181 radio().unreg_recv_callback(recv_callback_id_); 00182 }; 00183 00184 // Send a message the application has requested 00185 // template<class T, void (T::*TMethod)(uint8_t, node_id_t, size_t, block_data_t*)> 00186 // void sendX( node_id_t id, uint8_t len, uint8_t *data,T *obj_pnt) 00187 template<class T, void (T::*TMethod)(uint8_t, node_id_t, size_t, block_data_t*)> 00188 void send_callback(node_id_t from, size_t len, block_data_t *data,T *obj_pnt ) 00189 { 00190 00191 int pending_id = 0; 00192 if ( (pending_id = send(from,len,data)) != -1) { 00193 pending_messages_[pending_id].first.event_notifier_callback = event_notifier_delegate_t::template from_method<T, TMethod>( obj_pnt ); 00194 } 00195 00196 }; 00197 00198 int send( node_id_t id, size_t len, block_data_t *data) { 00199 //set_os(os_); 00200 if (id == BROADCAST_ADDRESS) { 00201 // create a message to host the data 00202 ReliableMessage m ; 00203 m.set_msg_id(ReliableMessage::BROADCAST_MESSAGE); 00204 m.set_seq_number(0); 00205 m.set_payload(len, data); 00206 // get the real payload to send 00207 00208 #ifdef DEBUG_RELIABLERADIO 00209 debug().debug("ReliableRadio::send [ type= %d(broad) size= %d]\n", m.msg_id() , m.buffer_size() ); 00210 #endif 00211 00212 // send a broadcast message 00213 radio().send(id, m.buffer_size(), (uint8_t * ) &m ); 00214 return -1; 00215 } 00216 /* 00217 * ADD message to Vector 00218 * CREATE a special message containing the original 00219 * SEND through normal radio 00220 * WAIT for ack or reliability_daemon to resend it 00221 * */ 00222 else { 00223 int pending_id = 0; // slot in the reliable vector available for new message 00224 pending_id = add_message(ReliableMessage::RELIABLE_MESSAGE, id, len, data); // add the message to the vector 00225 // If no position available inform APPLICATION 00226 // TODO: Inform application for error 00227 if (pending_id == -1) { 00228 #ifdef DEBUG_RELIABLERADIO 00229 debug().debug( "ReliableRadio::error [ type= vector_full ]\n"); 00230 #endif 00231 }// Get message from the Vector and send it through normal radio 00232 else { 00233 00234 #ifdef DEBUG_RELIABLERADIO 00235 #ifdef ISENSE_APP 00236 debug().debug("ReliableRadio::send [ type= %d(reliable) destination= %x seq_no= %d size= %d]", ReliableMessage::RELIABLE_MESSAGE, id, pending_messages_[pending_id].second.msg_id(), pending_messages_[pending_id].second.buffer_size()); 00237 #else 00238 debug().debug("ReliableRadio::send [ type= %d(reliable) destination= %d seq_no= %d size= %d]\n", ReliableMessage::RELIABLE_MESSAGE, id, pending_messages_[pending_id].second.msg_id(), pending_messages_[pending_id].second.buffer_size()); 00239 #endif 00240 #endif 00241 // send message through normal radio 00242 radio().send(id, pending_messages_[pending_id].second.buffer_size(), (uint8_t *) & pending_messages_[pending_id].second); 00243 } 00244 00245 return pending_id; 00246 } 00247 } 00248 ; 00249 00250 00251 00252 // Adds a message to the Vector and initializes the information struct conserning it 00253 int add_message(int msg_id, node_id_t destination, uint8_t len, uint8_t * data) { 00254 int conn_id = check_connection(destination); 00255 int pos; 00256 // find a free slot in the pending messages Vector to store the message Else return -1 00257 if ((pos = find_postition()) != -1) { 00258 // Store information about the message 00259 pending_messages_[pos].first.seq_no = open_connections[conn_id].sequence_numbers_++; 00260 pending_messages_[pos].first.destination = destination; 00261 pending_messages_[pos].first.timestamp = current_time_; 00262 pending_messages_[pos].first.retries = 0; 00263 // Store the message and set its headers 00264 pending_messages_[pos].second.set_msg_id(msg_id); 00265 pending_messages_[pos].second.set_seq_number(open_connections[conn_id].sequence_numbers_ - 1); 00266 pending_messages_[pos].second.set_payload(len, data); 00267 if (sequence_numbers_ == 0) { 00268 sequence_numbers_++; 00269 } 00270 return pos; 00271 } 00272 return -1; 00273 } 00274 ; 00275 00276 // -------------------------------------------------------------------- 00277 00278 00279 // Increment time by 1 00280 00281 void time_passes(void *) { 00282 current_time_++; 00283 //Debug::debug(os(),"Time is now %d\n",current_time_); 00284 timer().template set_timer<self_t, &self_t::time_passes > ( 00285 time_slice_, this, (void*) 0); 00286 00287 } 00288 ; 00289 00290 // -------------------------------------------------------------------- 00291 00292 /* 00293 * Reliable daemon 00294 * Check to see if any messages are sent 00295 * but not acked so to send them again. 00296 * 00297 * check by timestamps 00298 * if time for ack receive has passed 00299 * and retries 00300 * if maximum retries reached abort 00301 * 00302 * Reset timer for daemon 00303 */ 00304 00305 void reliable_daemon(void *) { 00306 bool found = false; // true if a message is to be resent 00307 for (int i = 0; i < MAX_PENDING; i++) { // search the message vector 00308 if (pending_messages_[i].first.seq_no != 0) { // only for valid entries : 0 means no message 00309 if (pending_messages_[i].first.timestamp < current_time_ - 2) { // 2 time units needed for message delivery 00310 if (pending_messages_[i].first.retries < MAX_RETRIES) { // if max retries reached 00311 // a message is to be resend 00312 pending_messages_[i].first.timestamp = current_time_; // refresh its sent timestamp 00313 00314 size_t mess_size = pending_messages_[i].second.buffer_size(); 00315 00316 mess_size = mess_size; 00317 00318 // resend the message 00319 radio().send( 00320 pending_messages_[i].first.destination, 00321 pending_messages_[i].second.buffer_size(), 00322 (uint8_t *) & pending_messages_[i].second); 00323 // increase by 1 retries 00324 pending_messages_[i].first.retries++; 00325 00326 #ifdef DEBUG_RELIABLERADIO 00327 #ifdef ISENSE_APP 00328 debug().debug("ReliableRadio::resend [ type= %d(reliable) destination= %x seq_no= %d size= %d]", ReliableMessage::RELIABLE_MESSAGE, 00329 pending_messages_[i].first.destination, 00330 pending_messages_[i].first.seq_no, 00331 mess_size 00332 ); 00333 #else 00334 debug().debug("ReliableRadio::resend [ type= %d(reliable) destination= %d seq_no= %d size= %d]\n", ReliableMessage::RELIABLE_MESSAGE, 00335 pending_messages_[i].first.destination, 00336 pending_messages_[i].first.seq_no, 00337 mess_size 00338 ); 00339 #endif 00340 #endif 00341 } else { // if max retries reached abort sending 00342 00343 #ifdef DEBUG_RELIABLERADIO 00344 #ifdef ISENSE_APP 00345 00346 debug().debug("ReliableRadio::abort [ type= %d(reliable) destination= %x seq_no= %d max_retries]", ReliableMessage::RELIABLE_MESSAGE, 00347 pending_messages_[i].first.destination, 00348 pending_messages_[i].first.seq_no); 00349 #else 00350 debug().debug("ReliableRadio::abort [ type= %d(reliable) destination= %d seq_no= %d max_retries]\n", ReliableMessage::RELIABLE_MESSAGE, 00351 pending_messages_[i].first.destination, 00352 pending_messages_[i].first.seq_no); 00353 #endif 00354 #endif 00355 if ( pending_messages_[i].first.event_notifier_callback != event_notifier_delegate_t() ) 00356 { 00357 pending_messages_[i].first.event_notifier_callback(MSG_DROPPED, 00358 pending_messages_[i].first.destination, 00359 pending_messages_[i].second.buffer_size(), 00360 (uint8_t *) & pending_messages_[i].second ); 00361 } 00362 00363 // clear vector entry to be reused 00364 pending_messages_[i].first.event_notifier_callback = event_notifier_delegate_t(); 00365 pending_messages_[i].first.seq_no = 0; 00366 } 00367 // a message was found 00368 found = true; 00369 } 00370 } 00371 00372 } 00373 00374 #ifdef DEBUG_RELIABLERADIO 00375 if (!found) { 00376 // Debug::debug(os(), "ReliableRadio::info No pending messages\n"); 00377 } 00378 #endif 00379 // Reset the timer 00380 timer().template set_timer<self_t, &self_t::reliable_daemon > ( 00381 sleep_time_, this, (void*) 0); 00382 } 00383 00384 // find a free entry inside the pending messages vector 00385 00386 int find_postition() { 00387 for (int i = 0; i < MAX_PENDING; i++) { 00388 if (pending_messages_[i].first.seq_no == 0) { // seq_no : 0 means no message 00389 return i; 00390 } else { 00391 } 00392 } 00393 return -1; 00394 } 00395 ; 00396 00397 // -------------------------------------------------------------------- 00398 00399 /* 00400 * Callback from the Radio module 00401 * 00402 * when a new message is received check its type: 00403 * - Ack_Message : mark as received all containing sequence numbers 00404 * - Broadcast message : send to the application 00405 * - ReliableMessage : send ack , and forward message to the application 00406 * 00407 * 00408 */ 00409 void receive(node_id_t from, size_t len, block_data_t* data) { 00410 00411 00412 // do not receive own messages 00413 if (radio().id() == from) { 00414 return; 00415 } 00416 // check for open connections with the sender 00417 int sd = check_connection(from); 00418 00419 // if a connection can be opened 00420 if (sd != -1) { 00421 00422 // check the message id 00423 if (data[0] == ReliableMessage::ACK_MESSAGE) { // if an ack message 00424 ReliableMessage recvack; 00425 memcpy(&recvack, data, len); 00426 #ifdef DEBUG_RELIABLERADIO 00427 #ifdef ISENSE_APP 00428 00429 debug().debug("ReliableRadio::receive [ type= %d(ack) node= %x from= %x payload_size= %d ", 00430 recvack.msg_id(), 00431 radio().id(), 00432 from, 00433 recvack.payload_size()); 00434 #ifndef EXTRA 00435 debug().debug("]"); 00436 #endif 00437 00438 #else 00439 00440 debug().debug("ReliableRadio::receive [ type= %d(ack) node= %d from= %d payload_size= %d ", 00441 recvack.msg_id(), 00442 radio().id(), 00443 from, 00444 recvack.payload_size()); 00445 #ifndef EXTRA 00446 debug().debug("]\n"); 00447 #endif 00448 #endif 00449 #endif 00450 00451 uint8_t seq_nos[recvack.payload_size()]; 00452 memcpy(seq_nos, recvack.payload(), recvack.payload_size()); 00453 // get sequence from the message and remove the respective messages from the list 00454 for (size_t j = 0; j < recvack.payload_size(); j += 2) { 00455 uint16_t seq_no_acked = seq_nos[j] + seq_nos[j + 1]*256; 00456 #ifdef EXTRA 00457 debug().debug("ReliableRadio::ack_msg_seq_no %d\n", seq_no_acked); 00458 #endif 00459 // search the whole vector 00460 for (int i = 0; i < MAX_PENDING; i++) { 00461 // if the sequence numbers match 00462 if (pending_messages_[i].first.seq_no == seq_no_acked) { 00463 // if the destinations match 00464 if (pending_messages_[i].first.destination == from) { 00465 // remove entry 00466 #ifdef DEBUG_RELIABLERADIO 00467 debug().debug("ReliableRadio::receive seq_no_acked=%d\n", seq_no_acked); 00468 #endif 00469 if ( pending_messages_[i].first.event_notifier_callback != event_notifier_delegate_t() ) 00470 { 00471 pending_messages_[i].first.event_notifier_callback(MSG_ACK_RCVD, 00472 pending_messages_[i].first.destination, 00473 pending_messages_[i].second.buffer_size(), 00474 (uint8_t *) & pending_messages_[i].second ); 00475 } 00476 00477 pending_messages_[i].first.event_notifier_callback = event_notifier_delegate_t(); 00478 pending_messages_[i].first.seq_no = 0; 00479 acked_messages_++; 00480 } 00481 } 00482 } 00483 } 00484 #ifdef EXTRA 00485 debug().debug("]\n"); 00486 #endif 00487 } else if (data[0] == ReliableMessage::RELIABLE_MESSAGE) { // if a reliable message was received 00488 00489 ReliableMessage recvm; 00490 00491 memcpy(&recvm, data, len); 00492 00493 // get sequence number from the message 00494 uint16_t curr_seq_no = recvm.seq_number(); 00495 // check if the message exists in cache 00496 bool first_time = was_received(sd, curr_seq_no); 00497 // if it does not 00498 if (first_time) { 00499 #ifdef DEBUG_RELIABLERADIO 00500 #ifdef ISENSE_APP 00501 debug().debug("ReliableRadio::receive [ type= %d(reliable) node= %x from= %x seq_no= %d ]", 00502 data[0], 00503 radio().id(), 00504 from, 00505 curr_seq_no); 00506 #else 00507 debug().debug("ReliableRadio::receive [ type= %d(reliable) node= %d from= %d seq_no= %d ]\n", 00508 data[0], 00509 radio().id(), 00510 from, 00511 curr_seq_no); 00512 #endif 00513 #endif 00514 // extract the real payload from the message 00515 00516 block_data_t msg_striped[recvm.payload_size()]; 00517 00518 memcpy(msg_striped, recvm.payload(), recvm.payload_size()); 00519 00520 // forward the payload to the application 00521 //message_received_callback_(from, recvm.payload_size(), msg_striped); 00522 notify_receivers(from, recvm.payload_size(), msg_striped); 00523 00524 00525 // create the new ack message to send 00526 00527 ReliableMessage ackm; 00528 00529 ackm.set_msg_id(ReliableMessage::ACK_MESSAGE); 00530 00531 00532 ackm.set_seq_number(0); 00533 set_ack_list(&ackm, sd); 00534 00535 // send the ack message 00536 radio().send(from, ackm.buffer_size(), (uint8_t *) & ackm); 00537 #ifdef DEBUG_RELIABLERADIO 00538 #ifdef ISENSE_APP 00539 debug().debug("ReliableRadio::send [ type= %d(ack) size= %d destination= %x ", 00540 ackm.msg_id(), 00541 ackm.buffer_size(), 00542 from); 00543 #else 00544 debug().debug("ReliableRadio::send [ type= %d(ack) size= %d destination= %d ", 00545 ackm.msg_id(), 00546 ackm.buffer_size(), 00547 from); 00548 #endif 00549 #endif 00550 #ifdef EXTRA 00551 block_data_t ack[ackm.payload_size()]; 00552 memcpy(ack,&ackm,ackm.payload_size()); 00553 for (int i = 0; i < ackm.payload_size(); i+=2) { 00554 debug().debug(" %d", ack[i] + ack[i+1]); 00555 00556 } 00557 #endif 00558 #ifdef DEBUG_RELIABLERADIO 00559 debug().debug("]\n"); 00560 #endif 00561 } else {// if the sequence number exists in cache then just send an ack message ( message was forwarded before to the application 00562 00563 // create the ack message 00564 ReliableMessage ackm; 00565 00566 ackm.set_msg_id(ReliableMessage::ACK_MESSAGE); 00567 00568 //ackm.set_payload(open_connections[sd].received_count*2); 00569 00570 set_ack_list(&ackm, sd); 00571 // send the ack message 00572 radio().send(from, ackm.buffer_size(), (uint8_t *) & ackm); 00573 } 00574 } else if (data[0] == ReliableMessage::BROADCAST_MESSAGE) { // if a broadcast message was received forward to the application 00575 #ifdef DEBUG_RELIABLERADIO 00576 #ifdef ISENSE_APP 00577 debug().debug("ReliableRadio::receive [ type= %d(broad) from= %x ]", data[0], from); 00578 #else 00579 debug().debug("ReliableRadio::receive [ type= %d(broad) from= %d ]\n", data[0], from); 00580 #endif 00581 #endif 00582 00583 ReliableMessage recvm; 00584 00585 memcpy(&recvm, data, len); 00586 00587 // extract payload from the message received 00588 block_data_t msg_recv[recvm.payload_size()]; 00589 memcpy(msg_recv, recvm.payload(), recvm.payload_size()); 00590 00591 // forward payload to the application 00592 //message_received_callback_(from, recvm.payload_size(), msg_recv); 00593 notify_receivers(from, recvm.payload_size(), msg_recv); 00594 00595 } else { 00596 #ifdef DEBUG_RELIABLERADIO 00597 #ifdef ISENSE_APP 00598 debug().debug("ReliableRadio::error Unsupported message Received from %x", from); 00599 #else 00600 debug().debug("ReliableRadio::error Unsupported message Received from %d\n", from); 00601 #endif 00602 #endif 00603 } 00604 } 00605 } 00606 ; 00607 00608 void set_ack_list(ReliableMessage * m, int sd) { 00609 00610 uint8_t seq_nos[open_connections[sd].received_count * 2]; 00611 for (int i = 0; i < open_connections[sd].received_count; i++) { 00612 seq_nos[2 * i] = open_connections[sd].received[i] % 256; 00613 seq_nos[2 * i + 1] = open_connections[sd].received[i] / 256; 00614 00615 } 00616 m->set_payload(open_connections[sd].received_count * 2, seq_nos); 00617 00618 } 00619 00620 // -------------------------------------------------------------------- 00621 00622 // Check if a connection with the sender was established before 00623 00624 int check_connection(node_id_t sender) { 00625 int avail = -1; 00626 for (int i = 0; i < MAX_CONNECTIONS; i++) { 00627 00628 if (open_connections[i].conn_id == sender) { 00629 return i; 00630 } else if (open_connections[i].conn_id == NULL_NODE_ID) { 00631 if (avail == -1) { 00632 avail = i; 00633 } 00634 } 00635 } 00636 // if the first connection get a new entry from the table 00637 if (avail != -1) { 00638 #ifdef DEBUG_RELIABLERADIO 00639 #ifdef ISENSE_APP 00640 debug().debug("ReliableRadio::connection Opened Connection %x to node %x", radio().id(), sender); 00641 #else 00642 debug().debug("ReliableRadio::connection Opened Connection %d to node %d\n", radio().id(), sender); 00643 #endif 00644 #endif 00645 open_connections[avail].conn_id = sender; 00646 open_connections[avail].max_received = 0; 00647 open_connections[avail].received_count = 0; 00648 for (int i = 0; i < MAX_RECV; i++) { 00649 open_connections[avail].received[i] = 0; 00650 } 00651 } 00652 // return the connection id 00653 return avail; 00654 }; 00655 00656 // -------------------------------------------------------------------- 00657 00658 // check sequence numbers cache to see if the message with this sequence number was previously received 00659 00660 bool was_received(int sd, uint16_t seq_no) { 00661 bool first_time = true; // true if the first time received 00662 for (int i = 0; i < open_connections[sd].received_count; i++) { 00663 if (open_connections[sd].received[i] == seq_no) first_time = false; 00664 } 00665 // add to message cache 00666 if (open_connections[sd].received_count < MAX_RECV) { 00667 open_connections[sd].received_count++; 00668 open_connections[sd].received[open_connections[sd].received_count - 1] = seq_no; 00669 } else { 00670 // shift all messages to the past and add seq_number as the latest 00671 for (int i = 0; i < MAX_RECV - 1; i++) { 00672 open_connections[sd].received[i] = open_connections[sd].received[i + 1]; 00673 } 00674 open_connections[sd].received[MAX_RECV - 1] = seq_no; 00675 } 00676 #ifdef EXTRA 00677 debug().debug("RECEIVED[ "); 00678 for (int i = 0; i < MAX_RECV; i++) { 00679 debug().debug(" %d", open_connections[sd].received[i]); 00680 } 00681 debug().debug("]\n"); 00682 #endif 00683 00684 return first_time; 00685 } 00686 ; 00687 00688 void set_max_retries(int max_retries) { 00689 MAX_RETRIES = max_retries; 00690 debug().debug("ReliableRadio::set max_retries= %d\n", max_retries); 00691 }; 00692 00693 int max_retries() { 00694 return MAX_RETRIES; 00695 }; 00696 00697 int acked_messages() { 00698 return acked_messages_; 00699 }; 00700 00701 // -------------------------------------------------------------------- 00702 // returns the node's id 00703 00704 node_id_t id() { 00705 return radio().id(); 00706 }; 00707 // -------------------------------------------------------------------- 00708 00709 00710 /* 00711 Os * os(){ 00712 return os_; 00713 } 00714 void set_os( Os * os ){ 00715 os_=os; 00716 }; 00717 */ 00718 private: 00719 int recv_callback_id_; // 00720 //radio_delegate_t message_received_callback_; 00721 //Os * os_; // os pointer 00722 uint16_t sequence_numbers_; // 00723 uint16_t current_time_; // 00724 vector_t pending_messages_; 00725 static const int time_slice_ = 200; // time_passes delay 00726 static const int sleep_time_ = 1000; // reliable_daemon's sleep time 00727 00728 struct connections { 00729 node_id_t conn_id; 00730 uint16_t received[MAX_RECV]; 00731 int received_count; 00732 uint16_t max_received; 00733 uint16_t sequence_numbers_; 00734 } open_connections[MAX_CONNECTIONS]; 00735 00736 int MAX_RETRIES; // Maximum retries to deliver a message before abort 00737 00738 00739 //debuging 00740 int acked_messages_; 00741 00742 Radio * radio_; 00743 Timer * timer_; 00744 Debug * debug_; 00745 00746 Radio& radio() { 00747 return *radio_; 00748 } 00749 00750 Timer& timer() { 00751 return *timer_; 00752 } 00753 00754 Debug& debug() { 00755 return *debug_; 00756 } 00757 }; 00758 00759 } 00760 00761 00762 #endif /* _RELIABLERADIO_H */