Wiselib
|
00001 #ifndef __DELAYED_MESSAGE_DELIVERY_H__ 00002 #define __DELAYED_MESSAGE_DELIVERY_H__ 00003 00004 #include <stdint.h> 00005 #include <sstream> 00006 00007 #include "util/serialization/serialization.h" 00008 00009 #include "roomba_statistics.h" 00010 00011 #include "external_interface/pc/roomba.h" 00012 #include "external_interface/external_interface.h" 00013 00014 #include "algorithms/neighbor_discovery/echo.h" 00015 #include "algorithms/neighbor_discovery/pgb_payloads_ids.h" 00016 #include "algorithms/end_to_end_communication/end_to_end_communication_msg.h" 00017 00018 template< typename OsModel_P, 00019 typename NeighborDiscovery_P, 00020 typename Roomba_P, 00021 typename Radio_P = typename OsModel_P::Radio, 00022 typename Debug_P = typename OsModel_P::Debug, 00023 typename Timer_P = typename OsModel_P::Timer, 00024 typename Clock_P = typename OsModel_P::Clock 00025 > 00026 class RoombaMessageDelivery { 00027 public: 00028 typedef OsModel_P Os; 00029 typedef NeighborDiscovery_P NeighborDiscovery; 00030 typedef Roomba_P Roomba; 00031 typedef Radio_P Radio; 00032 typedef Debug_P Debug; 00033 typedef Timer_P Timer; 00034 typedef Clock_P Clock; 00035 00036 typedef RoombaMessageDelivery<Os,NeighborDiscovery, Roomba, Radio, Debug, Timer> self_type; 00037 typedef self_type* self_pointer_t; 00038 00039 typedef typename wiselib::CommunicationMessage<Os, Radio> msg_t; 00040 00041 typedef typename Radio::block_data_t block_data_t; 00042 typedef typename Radio::size_t size_t; 00043 typedef typename Radio::node_id_t node_id_t; 00044 typedef typename Radio::message_id_t message_id_t; 00045 00046 enum { 00047 MAX_NUMBER_OF_STORED_MESSAGES = 255, // maximum number of stored messages 00048 }; 00049 00050 enum { 00051 MAX_NUMBER_OF_ROOMBAS = 10, // maximum number of Roombas in system, 00052 //gives size for roomba-interchange table 00053 }; 00054 00055 void init( NeighborDiscovery& neighbor_discovery, 00056 Roomba& roomba, 00057 Radio& radio, 00058 Debug& debug, 00059 Timer& timer, 00060 Clock& clock 00061 ) { 00062 stopped_ = true; 00063 00064 radio_ = &radio; 00065 debug_ = &debug; 00066 timer_ = &timer; 00067 clock_ = &clock; 00068 roomba_ = &roomba; 00069 neighbor_discovery_ = &neighbor_discovery; 00070 00071 init(); 00072 } 00073 00074 00075 void init() 00076 { 00077 // enable statistics output 00078 typename Os::Clock clock; 00079 starting_time_ = clock.seconds( clock.time() ); 00080 std::ostringstream time_string; 00081 time_string << "statistics_" << starting_time_ << ".txt"; 00082 statistics_ = new RoombaStatistics( time_string.str() ); 00083 00084 radio_->enable_radio(); 00085 radio_recv_callback_id_ = radio_->template reg_recv_callback<self_type, &self_type::on_receive>( this ); 00086 00087 neighbor_discovery_->enable(); 00088 if( neighbor_discovery_->register_payload_space( MOBILITY ) ) 00089 debug_->debug( "Could not register payload space in neighbor discovery module!\n" ); 00090 00091 uint8_t data = 1; 00092 neighbor_discovery_->set_payload( MOBILITY, &data, sizeof( data ) ); 00093 00094 uint8_t flags = NeighborDiscovery::NEW_NB_BIDI; 00095 neighbor_discovery_->template reg_event_callback<self_type, &self_type::on_new_neighbor>( MOBILITY, flags, this ); 00096 flags = NeighborDiscovery::NEW_PAYLOAD_BIDI; 00097 neighbor_discovery_->template reg_event_callback<self_type, &self_type::on_new_payload>( MOBILITY, flags, this ); 00098 00099 00100 // Initially no messages are stored. 00101 for( uint16_t i=0; i < MAX_NUMBER_OF_STORED_MESSAGES; ++i ) 00102 { 00103 stored_messages_[i].used = false; 00104 } 00105 number_of_stored_messages_ = 0; 00106 00107 // Initially no Roomba is known 00108 for( uint16_t i=0; i < MAX_NUMBER_OF_ROOMBAS; ++i ) 00109 { 00110 roomba_interchanged_msg_table_[i].used = false; 00111 } 00112 00113 timer_->template set_timer<self_type, &self_type::on_time>( 1000, this, 0 ); 00114 00115 stopped_ = false; 00116 } 00117 00118 int destruct() 00119 { 00120 stopped_ = true; 00121 00122 //Unregister callbacks 00123 neighbor_discovery_->unregister_payload_space( MOBILITY ); 00124 neighbor_discovery_->unreg_recv_callback( MOBILITY ); 00125 radio_->unregister_recv_callback( radio_recv_callback_id_ ); 00126 00127 neighbor_discovery_->disable(); 00128 radio_->disable_radio(); 00129 00130 for( uint16_t i=0; i < MAX_NUMBER_OF_STORED_MESSAGES; ++i ) 00131 { 00132 stored_messages_[i].used = false; 00133 } 00134 number_of_stored_messages_ = 0; 00135 00136 statistics_->close_statistics(); 00137 00138 return Os::SUCCES; 00139 } 00140 00145 uint16_t number_of_stored_messages() 00146 { 00147 return number_of_stored_messages_; 00148 } 00149 00150 protected: 00160 void on_new_neighbor( uint8_t event, node_id_t from, uint8_t len, uint8_t* data ) { 00161 debug_->debug( "on_new_neighbor %d\n", from ); 00162 00163 if( !stopped_ ) 00164 { 00165 if( ( event & NeighborDiscovery::NEW_NB_BIDI ) != 0 ) 00166 { 00167 for( int i=0; i < MAX_NUMBER_OF_STORED_MESSAGES; i++ ) 00168 { 00169 if( stored_messages_[i].used ) 00170 { 00171 msg_t* msg = &stored_messages_[i].message; 00172 00173 if( msg->dest() == from ) 00174 { 00175 debug_->debug( "Delivering message to %d, size: %d.\n", msg->dest(), msg->buffer_size() ); 00176 // Deliver stored message 00177 radio_->send( msg->dest(), msg->buffer_size(), (block_data_t*)msg ); 00178 00179 stored_messages_[i].used = false; 00180 number_of_stored_messages_--; 00181 00182 //Save statistics output 00183 typename Os::Clock clock; 00184 statistics_->print_statistics( 00185 clock.seconds( clock.time() ) - starting_time_, 00186 number_of_stored_messages_, 00187 -1 00188 ); 00189 } 00190 } 00191 } 00192 00193 debug_->debug( "number of stored messages: %d\n", number_of_stored_messages_); 00194 } 00195 } 00196 } 00197 00198 void on_new_payload( uint8_t event, node_id_t from, uint8_t len, uint8_t* data ) { 00199 //debug_->debug( "on_new_payload %d, %d, %d\n", event, from, *data ); 00200 00201 uint16_t i; 00202 if( *data ) { 00203 for( i=0; i<MAX_NUMBER_OF_ROOMBAS; ++i ) 00204 { 00205 if ( // return if at pos i is roomba 00206 roomba_interchanged_msg_table_[i].used == true && 00207 from == roomba_interchanged_msg_table_[i].roomba_id 00208 ) { 00209 return; 00210 } 00211 } 00212 00213 // hence, at this point we know that the roomba did not get the data, yet 00214 for( i=0; i < MAX_NUMBER_OF_ROOMBAS; ++i ) 00215 { 00216 if( roomba_interchanged_msg_table_[i].used == false ) 00217 { 00218 roomba_interchanged_msg_table_[i].used = true; 00219 roomba_interchanged_msg_table_[i].roomba_id = from; 00220 00221 break; 00222 } 00223 } 00224 if( i == MAX_NUMBER_OF_ROOMBAS ) // All slots are used. 00225 { 00226 // Drop first one, but give error message! 00227 // this should not happen! 00228 roomba_interchanged_msg_table_[0].used = true; 00229 roomba_interchanged_msg_table_[0].roomba_id = from; 00230 00231 debug_->debug( "Too many roombas in system for proper data dump exchange!\n" ); 00232 } 00233 } 00234 00235 // now send all stored messages to other roomba 00236 for( i=0; i<MAX_NUMBER_OF_STORED_MESSAGES; ++i ) 00237 { 00238 msg_t* msg = &stored_messages_[i].message; 00239 radio_->send( msg->dest(), msg->buffer_size(), (block_data_t*)msg ); 00240 } 00241 } 00242 00247 void on_time( void* userdata ) 00248 { 00249 //debug_->debug( "on_time: trying to deliver %d messages\n", number_of_stored_messages_ ); 00250 if( !stopped_ ) 00251 { 00252 for( int i=0; i < MAX_NUMBER_OF_STORED_MESSAGES; i++ ) 00253 { 00254 if( stored_messages_[i].used ) 00255 { 00256 msg_t* msg = &stored_messages_[i].message; 00257 00258 if( neighbor_discovery_->is_neighbor_bidi( msg->dest() ) ) 00259 { 00260 debug_->debug( "Delivering message to %d, size: %d.\n", msg->dest(), msg->buffer_size() ); 00261 radio_->send( msg->dest(), msg->buffer_size(), (block_data_t*)msg ); 00262 stored_messages_[i].used = false; 00263 number_of_stored_messages_--; 00264 00265 //Save statistics output 00266 typename Os::Clock clock; 00267 statistics_->print_statistics( 00268 clock.seconds( clock.time() ) - starting_time_, 00269 number_of_stored_messages_, 00270 -1 00271 ); 00272 } else { 00273 stored_messages_[i].store_duration++; 00274 } 00275 } 00276 } 00277 } 00278 00279 if( number_of_stored_messages_ > 0 ) 00280 { 00281 debug_->debug( "\a" ); 00282 } 00283 00284 timer_->template set_timer<self_type, &self_type::on_time>( 1000, this, 0 ); 00285 } 00286 00287 00288 00297 void on_receive( node_id_t id, size_t size, block_data_t* data) { 00298 debug_->debug( "on_receive from %d, msg_id: %d\n", id, wiselib::read<Os, block_data_t, message_id_t>( data ) ); 00299 00300 message_id_t msg_id = wiselib::read<Os, block_data_t, message_id_t>( data ); 00301 00302 if( msg_id == msg_t::END_TO_END_MESSAGE ) 00303 { 00304 uint16_t i; // run variable that is used for several for loops 00305 msg_t* msg = (msg_t*)data; 00306 00307 debug_->debug( "\aReceived message from %d with destination %d with size %d.\n", id, msg->dest(), msg->buffer_size() ); 00308 00309 // check if received message is already contained inside buffer, if so do not add 00310 for( i=0; i < MAX_NUMBER_OF_STORED_MESSAGES; ++i ) 00311 { 00312 if( stored_messages_[i].used && 00313 stored_messages_[i].message.seq_no() == msg->seq_no() && 00314 stored_messages_[i].message.source() == msg->source() 00315 ){ 00316 return; 00317 } 00318 } 00319 00320 // if destination is in roomba's range, directly deliver 00321 if( neighbor_discovery_->is_neighbor_bidi( msg->dest() ) ) 00322 { 00323 debug_->debug( "Delivering message directly to %d, size: %d.\n", msg->dest(), msg->buffer_size() ); 00324 radio_->send( msg->dest(), msg->buffer_size(), (block_data_t*)msg ); 00325 00326 statistics_->print_statistics_comment( "direct message delivery at time:" ); 00327 00328 //Save statistics output 00329 typename Os::Clock clock; 00330 statistics_->print_statistics( 00331 clock.seconds( clock.time() ) - starting_time_, 00332 number_of_stored_messages_, 00333 0 00334 ); 00335 return; 00336 } 00337 00338 /* 00339 * Put message into buffer. This is done by find the first empty slot. If no slot is empty, the oldest 00340 * message will be deleted and the new message will be put to that position. 00341 */ 00342 uint16_t cur_max = 0; 00343 uint16_t cur_max_index = 0; 00344 00345 for( i=0; i < MAX_NUMBER_OF_STORED_MESSAGES; ++i ) 00346 { 00347 if( !stored_messages_[i].used ) 00348 { 00349 break; 00350 } else { 00351 if( stored_messages_[i].store_duration > cur_max ) 00352 { 00353 cur_max = stored_messages_[i].store_duration; 00354 cur_max_index = i; 00355 } 00356 } 00357 } 00358 00359 if( i == MAX_NUMBER_OF_STORED_MESSAGES ) // All slots are used. 00360 { 00361 // Drop oldest message by the new message. 00362 i = cur_max_index; 00363 number_of_stored_messages_--; 00364 00365 debug_->debug( "Could not deliver message from %d to %d. (buffer overflow)\n", 00366 stored_messages_[cur_max_index].message.source(), 00367 stored_messages_[cur_max_index].message.dest() ); 00368 statistics_->print_statistics_comment( "Message buffer overflowed: deleted oldest message!" ); 00369 } 00370 00371 //Store the message in slot i 00372 stored_messages_[i].used = true; 00373 stored_messages_[i].store_duration = 0; 00374 memcpy( &stored_messages_[i].message, msg, msg->buffer_size() ); 00375 number_of_stored_messages_++; 00376 00377 debug_->debug( "number of stored messages: %d\n", number_of_stored_messages_); 00378 00379 // clear list of roombas that got current state of message buffer 00380 for( i=0; i < MAX_NUMBER_OF_ROOMBAS; ++i ) 00381 { 00382 roomba_interchanged_msg_table_[i].used = false; 00383 } 00384 00385 //Save statistics output 00386 typename Os::Clock clock; 00387 statistics_->print_statistics( 00388 clock.seconds( clock.time() ) - starting_time_, 00389 number_of_stored_messages_, 00390 1 00391 ); 00392 } 00393 } 00394 00395 struct message_entry 00396 { 00397 bool used; 00398 uint16_t store_duration; 00399 msg_t message; 00400 }; 00401 00402 struct roomba_interchanged_msg_entry 00403 { 00404 bool used; 00405 node_id_t roomba_id; 00406 }; 00407 00408 Radio* radio_; 00409 int starting_time_; // starting time that shall be substracted from current event time for statistics 00410 typename Debug::self_pointer_t debug_; 00411 typename Timer::self_pointer_t timer_; 00412 typename Clock::self_pointer_t clock_; 00413 typename Roomba::self_pointer_t roomba_; 00414 NeighborDiscovery* neighbor_discovery_; 00415 RoombaStatistics* statistics_; 00416 00417 roomba_interchanged_msg_entry roomba_interchanged_msg_table_[MAX_NUMBER_OF_ROOMBAS]; // saves for current buffer which roombas alread got buffer dump 00418 message_entry stored_messages_[MAX_NUMBER_OF_STORED_MESSAGES]; 00419 uint16_t number_of_stored_messages_; 00420 int radio_recv_callback_id_; 00421 bool stopped_; 00422 }; 00423 00424 #endif //DELAYED_MESSAGE_DELIVERY