Wiselib
|
00001 /* 00002 * File: volumeradio.h 00003 * Author: amaxilatis 00004 * 00005 * Created on August 2, 2010, 1:15 PM 00006 */ 00007 00008 #ifndef _VOLUMERADIO_H 00009 #define _VOLUMERADIO_H 00010 00011 //wiselib includes 00012 #include "util/delegates/delegate.hpp" 00013 #include "util/pstl/vector_static.h" 00014 #include "util/pstl/pair.h" 00015 00016 #include "../radio_base.h" 00017 00018 // volume message type include 00019 #include "volumemsg.h" 00020 00021 /* 00022 * DEBUG MESSAGES TEMPLATE 00023 * VolumeRadio::<task> [ type= ...] 00024 * 00025 * */ 00026 #define DEBUG_VOLUMERADIO 00027 00028 namespace wiselib { 00029 00030 /* 00031 * ReliableRadio Template 00032 * Uses OsModel, Radio and Timer 00033 * Degug is only for debugging 00034 * ReliableRadio is used as a layer between normal radio 00035 * and application to make sure that messages will be 00036 * delivered besides any errors that may occur. 00037 * */ 00038 template<typename OsModel_P, typename Radio_P, typename Timer_P, 00039 typename Debug_P> 00040 class VolumeRadio 00041 : public RadioBase<OsModel_P,Radio_P>{ 00042 public: 00043 // Type definitions 00044 typedef OsModel_P OsModel; 00045 00046 typedef Radio_P Radio; 00047 00048 typedef Debug_P Debug; 00049 typedef Timer_P Timer; 00050 00051 typedef typename Radio::node_id_t node_id_t; 00052 typedef typename Radio::size_t size_t; 00053 typedef typename Radio::block_data_t block_data_t; 00054 typedef typename Radio::message_id_t message_id_t; 00055 00056 00057 00058 00059 00060 00061 typedef VolumeRadio<OsModel_P, Radio_P, Timer_P, Debug_P> self_t; 00062 00063 00064 typedef VolumeMsg<OsModel, Radio > VolumeMessage_t; 00065 00066 /* 00067 * struct that describes a message buffer 00068 * contains the sequence number 00069 * the source of the message 00070 * and data to show if receiving is complete 00071 */ 00072 struct information { 00073 uint16_t seq_no; 00074 node_id_t source; 00075 uint8_t fragments_received; 00076 uint8_t total_fragments; 00077 }; 00078 00079 typedef struct information information_t; 00080 00081 00082 // radio callbacks delegate 00083 typedef delegate3<void, int, long, unsigned char*> radio_delegate_t; 00084 00085 // -------------------------------------------------------------------- 00086 00087 enum SpecialNodeIds { 00088 BROADCAST_ADDRESS = Radio::BROADCAST_ADDRESS, 00089 NULL_NODE_ID = Radio::NULL_NODE_ID 00091 }; 00092 // -------------------------------------------------------------------- 00093 00094 enum Restrictions { 00095 FRAGMENT_SIZE = VolumeMessage_t::FRAGMENT_SIZE, 00096 MAX_MESSAGE_LENGTH = VolumeMessage_t::MAX_MESSAGE_LENGTH 00098 }; 00099 00100 // Vector Storing all messages received but not finished yet 00101 typedef wiselib::vector_static<OsModel, 00102 pair<information_t, uint8_t* >, MAX_PENDING> vector_t; 00103 00104 // -------------------------------------------------------------------- 00105 00106 VolumeRadio() { 00107 } 00108 ; 00109 00110 ~VolumeRadio() { 00111 } 00112 ; 00113 00114 00115 // Enable the Radio 00116 00117 void enable_radio() { 00118 #ifdef DEBUG_VOLUMERADIO 00119 debug().debug("VolumeRadio::enable\n"); 00120 #endif 00121 00122 //set the local os pointer 00123 00124 //enable normal radio 00125 radio().enable_radio(); 00126 // register receive callback to normal radio 00127 recv_callback_id_ = radio().template reg_recv_callback<self_t, 00128 &self_t::receive > ( this); 00129 00130 // initialize the buffers for pending messages 00131 for (int i = 0; i < MAX_PENDING; i++) { 00132 pending_messages_[i].first.seq_no = 0; // seq_no == 0 means unused 00133 pending_messages_[i].second = new uint8_t[MAX_MESSAGE_LENGTH]; // set the buffer size 00134 00135 } 00136 00137 // initialize the connections array 00138 for (int i = 0; i < MAX_CONNECTIONS_; i++) { 00139 open_connections[i].conn_id = -1; // -1 means connection unused 00140 open_connections[i].sequence_numbers_ = 1; // sequence numbers start from 1 ( 0 means unused see above) 00141 } 00142 00143 // ??? only for broadcast messages ??? 00144 seq_numbers_ = 1; 00145 00146 00147 00148 00149 } 00150 ; 00151 00152 // -------------------------------------------------------------------- 00153 00154 00155 // Send a message the application has requested 00156 00157 void send( node_id_t id, uint16_t len, block_data_t *data) { 00158 00159 block_data_t local_data[len]; 00160 00161 00162 00163 // check for an open connection to the destination 00164 int connection = check_connection(id); 00165 // get the next available sequence number 00166 uint16_t seq_no = open_connections[connection].sequence_numbers_++; 00167 // if the next sequence number is 0 then set it as 1 00168 if (open_connections[connection].sequence_numbers_ == 0) open_connections[connection].sequence_numbers_++; 00169 00170 #ifdef DEBUG_VOLUMERADIO 00171 debug().debug( "VolumeRadio::send [node %d |to %d |len %d |seq_no %d ", radio().id(), id, len, seq_no); 00172 #endif 00173 00174 // create a new volume message 00175 VolumeMessage_t m = VolumeMessage_t(); 00176 // initialize the message 00177 00178 if (len>VolumeMessage_t::FRAGMENT_SIZE){ 00179 m.set_msg_id(VolumeMessage_t::VOLUME_MESSAGE); 00180 } 00181 else{ 00182 m.set_msg_id(VolumeMessage_t::SINGLE_MESSAGE); 00183 } 00184 00185 m.set_seq_number(seq_no); 00186 00187 uint8_t fragments = len/VolumeMessage_t::FRAGMENT_SIZE + 1; 00188 #ifdef DEBUG_VOLUMERADIO 00189 debug().debug( "split_count= %d ]\n", fragments); 00190 #endif 00191 m.set_fragments(fragments); 00192 // Send all fragments of the message 00193 uint8_t i; 00194 for (i = 0 ; i < (fragments-1); i++) { 00195 // create a temp buffer for the message 00196 m.set_fragment_id(i); 00197 m.set_payload(FRAGMENT_SIZE,local_data+(i*FRAGMENT_SIZE)); 00198 00199 // send the fragment 00200 radio().send(id, m.buffer_size(), (uint8_t * ) &m); 00201 #ifdef DEBUG_VOLUMERADIO 00202 debug().debug( "VolumeRadio::send [%d |dest= %d |seq_no %d |fr_no %d |tot_fr %d |size= %d |...]\n", 00203 m.msg_id(), 00204 id, 00205 m.seq_number(), 00206 m.fragment_id(), 00207 m.fragments(), 00208 m.buffer_size() 00209 00210 ); 00211 #endif 00212 00213 } 00214 m.set_fragment_id(i); 00215 m.set_payload(len%FRAGMENT_SIZE,data+(i*FRAGMENT_SIZE)); 00216 radio().send(id, (size_t)m.buffer_size(), (uint8_t * ) &m); 00217 #ifdef DEBUG_VOLUMERADIO 00218 debug().debug( "VolumeRadio::send [%d |dest= %d |seq_no %d |fr_no %d |tot_fr %d |size= %d |...]\n", 00219 m.msg_id(), 00220 id, 00221 m.seq_number(), 00222 m.fragment_id(), 00223 m.fragments(), 00224 m.buffer_size() 00225 00226 ); 00227 #endif 00228 00229 00230 00231 } 00232 ; 00233 00234 00235 00236 // -------------------------------------------------------------------- 00237 00238 /* 00239 * Callback from the Radio module 00240 * 00241 * when a new message is received check its type: 00242 * - Ack_Message : mark as received all containing sequence numbers 00243 * - Broadcast message : send to the application 00244 * - ReliableMessage : send ack , and forward message to the application 00245 * 00246 * 00247 */ 00248 void receive(node_id_t from, size_t len, block_data_t* data_t) { 00249 // do not receive own messages 00250 if (radio().id() == from) { 00251 debug().debug( "message heard\n"); 00252 return; 00253 } 00254 VolumeMessage_t mrecv; 00255 memcpy(&mrecv,data_t,len); 00256 00257 if (mrecv.msg_id() == VolumeMessage_t::VOLUME_MESSAGE) { 00258 00259 #ifdef DEBUG_VOLUMERADIO 00260 debug().debug( "VolumeRadio::receive [%d |from %d |seq_no %d |fr_no %d |tot_fr %d |size %d |...]\n", 00261 mrecv.msg_id(), 00262 from, 00263 mrecv.seq_number(), 00264 mrecv.fragment_id(), 00265 mrecv.fragments(), 00266 len); 00267 #endif 00268 00269 // get all the information from the header 00270 node_id_t source = from; 00271 uint16_t seq_no = mrecv.seq_number(); 00272 uint8_t fragment_num = mrecv.fragment_id(); 00273 uint8_t total_frags = mrecv.fragments(); 00274 00275 00276 // local buffer for payload fragment 00277 uint8_t t_buff[mrecv.payload_size()]; 00278 00279 memcpy(t_buff,mrecv.payload(),mrecv.payload_size()); 00280 //t_buff = VolumeMessage_t::strip_message(data); 00281 00282 // find the buffer dedicated to this message 00283 int fd = flow_id(seq_no, source, total_frags); 00284 00285 if (fd != -1) { 00286 #ifdef DEBUG_VOLUMERADIO 00287 debug().debug( "VolumeRadio::receive pending_message %d frag_no %d\n", fd, fragment_num); 00288 #endif 00289 00290 // add fragment to the buffer. if finished set the flag 00291 bool finished = add_fragment_to_message(fd, fragment_num, t_buff); 00292 00293 // if finished forward to the application 00294 if (finished) { 00295 #ifdef DEBUG_VOLUMERADIO 00296 debug().debug( "VolumeRadio::receive pending_message %d completed %d \n", fd , ); 00297 #endif 00298 notify_receivers(from, (FRAGMENT_SIZE) * total_frags, pending_messages_[fd].second); 00299 } 00300 00301 00302 } else { 00303 #ifdef DEBUG_VOLUMERADIO 00304 debug().debug( "VolumeRadio::receive vector is full\n"); 00305 #endif 00306 } 00307 00308 00309 } else if (mrecv.msg_id() == VolumeMessage_t::SINGLE_MESSAGE) { 00310 00311 #ifdef DEBUG_VOLUMERADIO 00312 debug().debug( "VolumeRadio::receive [%d |from %d |seq_no %d |fr_no %d |tot_fr %d |...]\n", 00313 mrecv.msg_id(), 00314 from, 00315 mrecv.seq_number(), 00316 mrecv.fragment_id(), 00317 mrecv.fragments() 00318 ); 00319 #endif 00320 // get the payload from the buffer 00321 uint8_t t_buff[mrecv.payload_size()]; 00322 00323 memcpy(t_buff,mrecv.payload(),mrecv.payload_size()); 00324 //uint8_t * t_buff = new uint8_t[len - VolumeMessage_t::header_size()]; 00325 //t_buff = VolumeMessage_t::strip_message(data); 00326 // forward message to the application 00327 00328 #ifdef DEBUG_VOLUMERADIO 00329 debug().debug( "VolumeRadio::message_received_callback\n"); 00330 #endif 00331 notify_receivers(from, mrecv.payload_size() , t_buff); 00332 00333 00334 } 00335 00336 } 00337 ; 00338 00339 // checks the vector of messages to see if the first fragment. 00340 // if not returns the id of the vector position 00341 // if yes allocates an available buffer and initializes data 00342 // if yes && buffers full returns -1 00343 00344 int flow_id(uint16_t seq_no, node_id_t source, uint8_t total_frags) { 00345 int fd = -1; 00346 00347 // search vector for message and a free slot 00348 for (int i = 0; i < MAX_PENDING; i++) { 00349 if ((seq_no == pending_messages_[i].first.seq_no) && (source == pending_messages_[i].first.source)) 00350 return i; 00351 if ((pending_messages_[i].first.seq_no == 0) && (fd == -1)) 00352 fd = i; 00353 } 00354 if (fd != -1) { 00355 // if does not exist but we have an empty buffer allocate it 00356 pending_messages_[fd].first.seq_no = seq_no; 00357 pending_messages_[fd].first.source = source; 00358 pending_messages_[fd].first.fragments_received = 0; 00359 pending_messages_[fd].first.total_fragments = total_frags; 00360 } 00361 // if not return -1 00362 return fd; 00363 00364 }; 00365 00366 // add a fragment to the payload buffer and returns if finished 00367 00368 bool add_fragment_to_message(int fd, uint8_t fragment_num, uint8_t * data) { 00369 // copy data to local memory 00370 memcpy(pending_messages_[fd].second + FRAGMENT_SIZE*fragment_num, data, FRAGMENT_SIZE); 00371 // inrease fragments received 00372 pending_messages_[fd].first.fragments_received++; 00373 // if the last fragment clear the buffer 00374 if (pending_messages_[fd].first.fragments_received == 00375 pending_messages_[fd].first.total_fragments) { 00376 pending_messages_[fd].first.seq_no = 0; 00377 // return message is complete 00378 return true; 00379 } else { 00380 // return message is incomplete 00381 return false; 00382 } 00383 00384 }; 00385 // -------------------------------------------------------------------- 00386 00387 // Check if a connection with the sender was established before 00388 00389 int check_connection(node_id_t node) { 00390 int avail = -1; 00391 for (int i = 0; i < MAX_CONNECTIONS_; i++) { 00392 if (open_connections[i].conn_id == node) { 00393 return i; 00394 } else if (open_connections[i].conn_id == -1) { 00395 if (avail == -1) { 00396 avail = i; 00397 } 00398 } 00399 } 00400 // if the first connection get a new entry from the table 00401 if (avail != -1) { 00402 #ifdef DEBUG_VOLUMERADIO 00403 debug().debug( "VolumeRadio::connection Opened Connection %d to node %d\n", radio().id(), node); 00404 #endif 00405 open_connections[avail].conn_id = node; 00406 open_connections[avail].sequence_numbers_ = 1; 00407 00408 } else { 00409 #ifdef DEBUG_VOLUMERADIO 00410 debug().debug( "VolumeRadio::connection Out of connections node= %d\n", radio().id()); 00411 #endif 00412 } 00413 // return the connection id 00414 return avail; 00415 }; 00416 00417 // -------------------------------------------------------------------- 00418 00419 00420 // returns the node's id 00421 00422 node_id_t id() { 00423 return radio().id(); 00424 } 00425 // -------------------------------------------------------------------- 00426 00427 00428 /* 00429 initialize the module 00430 */ 00431 void init(Radio& radio, Timer& timer, Debug& debug) { 00432 radio_ = &radio; 00433 timer_ = &timer; 00434 debug_ = &debug; 00435 }; 00436 00437 00438 private: 00439 int recv_callback_id_; // callback for receive function 00440 radio_delegate_t message_received_callback_; // callback for application's receive function 00441 00442 00443 vector_t pending_messages_; // contains the buffers for the pending messages 00444 00445 static const int MAX_CONNECTIONS_ = 40; // maximum available connections 00446 uint16_t seq_numbers_; // sequence number for messages 00447 00448 struct connections { 00449 node_id_t conn_id; 00450 uint16_t sequence_numbers_; 00451 } open_connections[MAX_CONNECTIONS_]; 00452 00453 00454 00455 00456 Radio * radio_; 00457 Timer * timer_; 00458 Debug * debug_; 00459 00460 Radio& radio() { 00461 return *radio_; 00462 } 00463 00464 Timer& timer() { 00465 return *timer_; 00466 } 00467 00468 Debug& debug() { 00469 return *debug_; 00470 } 00471 }; 00472 00473 } 00474 00475 00476 00477 00478 #endif /* _VOLUMERADIO_H */ 00479