Wiselib
wiselib.testing/algorithms/roomba_message_delivery/roomba_message_delivery.h
Go to the documentation of this file.
00001 #ifndef __DELAYED_MESSAGE_DELIVERY_H__
00002 #define __DELAYED_MESSAGE_DELIVERY_H__
00003 
00004 #include <stdint.h>
00005 #include <sstream>
00006 
00007 #include "util/serialization/serialization.h"
00008 
00009 #include "roomba_statistics.h"
00010 
00011 #include "external_interface/pc/roomba.h"
00012 #include "external_interface/external_interface.h"
00013 
00014 #include "algorithms/neighbor_discovery/echo.h"
00015 #include "algorithms/neighbor_discovery/pgb_payloads_ids.h"
00016 #include "algorithms/end_to_end_communication/end_to_end_communication_msg.h"
00017 
00018 template<   typename OsModel_P,
00019          typename NeighborDiscovery_P,
00020          typename Roomba_P,
00021          typename Radio_P = typename OsModel_P::Radio,
00022          typename Debug_P = typename OsModel_P::Debug,
00023          typename Timer_P = typename OsModel_P::Timer,
00024          typename Clock_P = typename OsModel_P::Clock
00025          >
00026 class RoombaMessageDelivery {
00027    public:
00028       typedef OsModel_P Os;
00029       typedef NeighborDiscovery_P NeighborDiscovery;
00030       typedef Roomba_P Roomba;
00031       typedef Radio_P Radio;
00032       typedef Debug_P Debug;
00033       typedef Timer_P Timer;
00034       typedef Clock_P Clock;
00035 
00036       typedef RoombaMessageDelivery<Os,NeighborDiscovery, Roomba, Radio, Debug, Timer> self_type;
00037       typedef self_type* self_pointer_t;
00038 
00039       typedef typename wiselib::CommunicationMessage<Os, Radio> msg_t;
00040 
00041       typedef typename Radio::block_data_t block_data_t;
00042       typedef typename Radio::size_t size_t;
00043       typedef typename Radio::node_id_t node_id_t;
00044       typedef typename Radio::message_id_t message_id_t;
00045 
00046       enum {
00047          MAX_NUMBER_OF_STORED_MESSAGES = 255, // maximum number of stored messages
00048       };
00049 
00050       enum {
00051          MAX_NUMBER_OF_ROOMBAS = 10,   // maximum number of Roombas in system, 
00052                      //gives size for roomba-interchange table
00053       };
00054 
00055       void init( NeighborDiscovery& neighbor_discovery, 
00056             Roomba& roomba, 
00057             Radio& radio, 
00058             Debug& debug, 
00059             Timer& timer, 
00060             Clock& clock
00061       ) {
00062          stopped_ = true;
00063 
00064          radio_ = &radio;
00065          debug_ = &debug;
00066          timer_ = &timer;
00067          clock_ = &clock;
00068          roomba_ = &roomba;
00069          neighbor_discovery_ = &neighbor_discovery;
00070 
00071          init();
00072       }
00073 
00074 
00075       void init()
00076       {
00077          // enable statistics output
00078          typename Os::Clock clock;
00079          starting_time_ = clock.seconds( clock.time() );
00080          std::ostringstream time_string;
00081          time_string << "statistics_" << starting_time_ << ".txt";
00082          statistics_ = new RoombaStatistics( time_string.str() );
00083 
00084          radio_->enable_radio();
00085          radio_recv_callback_id_ = radio_->template reg_recv_callback<self_type, &self_type::on_receive>( this );
00086 
00087          neighbor_discovery_->enable();
00088          if( neighbor_discovery_->register_payload_space( MOBILITY ) )
00089             debug_->debug( "Could not register payload space in neighbor discovery module!\n" );
00090 
00091          uint8_t data = 1;
00092          neighbor_discovery_->set_payload( MOBILITY, &data, sizeof( data ) );
00093 
00094          uint8_t flags = NeighborDiscovery::NEW_NB_BIDI;
00095          neighbor_discovery_->template reg_event_callback<self_type, &self_type::on_new_neighbor>( MOBILITY, flags, this );
00096          flags = NeighborDiscovery::NEW_PAYLOAD_BIDI;
00097          neighbor_discovery_->template reg_event_callback<self_type, &self_type::on_new_payload>( MOBILITY, flags, this );
00098 
00099 
00100          // Initially no messages are stored.
00101          for( uint16_t i=0; i < MAX_NUMBER_OF_STORED_MESSAGES; ++i )
00102          {
00103             stored_messages_[i].used = false;
00104          }
00105          number_of_stored_messages_ = 0;
00106 
00107          // Initially no Roomba is known
00108          for( uint16_t i=0; i < MAX_NUMBER_OF_ROOMBAS; ++i )
00109          {
00110             roomba_interchanged_msg_table_[i].used = false;
00111          }
00112 
00113          timer_->template set_timer<self_type, &self_type::on_time>( 1000, this, 0 );
00114 
00115          stopped_ = false;
00116       }
00117 
00118       int destruct()
00119       {
00120          stopped_ = true;
00121 
00122          //Unregister callbacks
00123          neighbor_discovery_->unregister_payload_space( MOBILITY );
00124          neighbor_discovery_->unreg_recv_callback( MOBILITY );
00125          radio_->unregister_recv_callback( radio_recv_callback_id_ );
00126 
00127          neighbor_discovery_->disable();
00128          radio_->disable_radio();
00129 
00130          for( uint16_t i=0; i < MAX_NUMBER_OF_STORED_MESSAGES; ++i )
00131          {
00132             stored_messages_[i].used = false;
00133          }
00134          number_of_stored_messages_ = 0;
00135 
00136          statistics_->close_statistics();
00137 
00138          return Os::SUCCES;
00139       }
00140 
00145       uint16_t number_of_stored_messages()
00146       {
00147          return number_of_stored_messages_;
00148       }
00149 
00150    protected:
00160       void on_new_neighbor( uint8_t event, node_id_t from, uint8_t len, uint8_t* data ) {
00161          debug_->debug( "on_new_neighbor %d\n", from );
00162 
00163          if( !stopped_ )
00164          {
00165             if( ( event & NeighborDiscovery::NEW_NB_BIDI ) != 0 )
00166             {
00167                for( int i=0; i < MAX_NUMBER_OF_STORED_MESSAGES; i++ )
00168                {
00169                   if( stored_messages_[i].used )
00170                   {
00171                      msg_t* msg = &stored_messages_[i].message;
00172 
00173                      if( msg->dest() == from )
00174                      {
00175                         debug_->debug( "Delivering message to %d, size: %d.\n", msg->dest(), msg->buffer_size() );
00176                         // Deliver stored message
00177                         radio_->send( msg->dest(), msg->buffer_size(), (block_data_t*)msg );
00178 
00179                         stored_messages_[i].used = false;
00180                         number_of_stored_messages_--;
00181                         
00182                         //Save statistics output
00183                         typename Os::Clock clock;
00184                         statistics_->print_statistics(
00185                            clock.seconds( clock.time() ) - starting_time_,
00186                            number_of_stored_messages_,
00187                            -1
00188                         );
00189                      }
00190                   }
00191                }
00192 
00193                debug_->debug( "number of stored messages: %d\n", number_of_stored_messages_);
00194             }
00195          }
00196       }
00197 
00198       void on_new_payload( uint8_t event, node_id_t from, uint8_t len, uint8_t* data ) {
00199          //debug_->debug( "on_new_payload %d, %d, %d\n", event, from, *data );
00200 
00201          uint16_t i;
00202          if( *data ) {
00203             for( i=0; i<MAX_NUMBER_OF_ROOMBAS; ++i )
00204             {
00205                if ( // return if at pos i is roomba
00206                      roomba_interchanged_msg_table_[i].used == true &&
00207                      from == roomba_interchanged_msg_table_[i].roomba_id
00208                ) {
00209                   return;
00210                }
00211             }
00212 
00213             // hence, at this point we know that the roomba did not get the data, yet
00214             for( i=0; i < MAX_NUMBER_OF_ROOMBAS; ++i )
00215             {
00216                if( roomba_interchanged_msg_table_[i].used == false )
00217                {
00218                  roomba_interchanged_msg_table_[i].used = true;
00219                  roomba_interchanged_msg_table_[i].roomba_id = from;
00220 
00221                  break;
00222                }
00223             }
00224             if( i == MAX_NUMBER_OF_ROOMBAS ) // All slots are used.
00225             {
00226                // Drop first one, but give error message!
00227                // this should not happen!
00228                roomba_interchanged_msg_table_[0].used = true;
00229                roomba_interchanged_msg_table_[0].roomba_id = from;
00230 
00231                debug_->debug( "Too many roombas in system for proper data dump exchange!\n" );
00232             }
00233          }
00234 
00235          // now send all stored messages to other roomba
00236          for( i=0; i<MAX_NUMBER_OF_STORED_MESSAGES; ++i )
00237          {
00238             msg_t* msg = &stored_messages_[i].message;
00239             radio_->send( msg->dest(), msg->buffer_size(), (block_data_t*)msg );
00240          }
00241       }
00242 
00247       void on_time( void* userdata )
00248       {
00249          //debug_->debug( "on_time: trying to deliver %d messages\n", number_of_stored_messages_ );
00250          if( !stopped_ )
00251          {
00252             for( int i=0; i < MAX_NUMBER_OF_STORED_MESSAGES; i++ )
00253             {
00254                if( stored_messages_[i].used )
00255                {
00256                   msg_t* msg = &stored_messages_[i].message;
00257 
00258                   if( neighbor_discovery_->is_neighbor_bidi( msg->dest() ) ) 
00259                   {
00260                      debug_->debug( "Delivering message to %d, size: %d.\n", msg->dest(), msg->buffer_size() );
00261                      radio_->send( msg->dest(), msg->buffer_size(), (block_data_t*)msg );
00262                      stored_messages_[i].used = false;
00263                      number_of_stored_messages_--;
00264                      
00265                      //Save statistics output
00266                      typename Os::Clock clock;
00267                      statistics_->print_statistics(
00268                         clock.seconds( clock.time() ) - starting_time_,
00269                         number_of_stored_messages_,
00270                         -1
00271                      );
00272                   } else {
00273                      stored_messages_[i].store_duration++;
00274                   }
00275                }
00276             }
00277          }
00278 
00279          if( number_of_stored_messages_ > 0 )
00280          {
00281             debug_->debug( "\a" );
00282          }
00283 
00284          timer_->template set_timer<self_type, &self_type::on_time>( 1000, this, 0 );
00285       }
00286 
00287 
00288 
00297       void on_receive( node_id_t id, size_t size, block_data_t* data) {
00298          debug_->debug( "on_receive from %d, msg_id: %d\n", id, wiselib::read<Os, block_data_t, message_id_t>( data ) );
00299 
00300          message_id_t msg_id = wiselib::read<Os, block_data_t, message_id_t>( data );
00301 
00302          if( msg_id == msg_t::END_TO_END_MESSAGE )
00303          {
00304             uint16_t i;    // run variable that is used for several for loops
00305             msg_t* msg = (msg_t*)data;
00306 
00307             debug_->debug( "\aReceived message from %d with destination %d with size %d.\n", id, msg->dest(), msg->buffer_size() );
00308 
00309             // check if received message is already contained inside buffer, if so do not add
00310             for( i=0; i < MAX_NUMBER_OF_STORED_MESSAGES; ++i )
00311             {
00312                if( stored_messages_[i].used &&
00313                   stored_messages_[i].message.seq_no() == msg->seq_no() &&
00314                   stored_messages_[i].message.source() == msg->source()
00315                ){
00316                   return;
00317                }
00318             }
00319 
00320             // if destination is in roomba's range, directly deliver
00321             if( neighbor_discovery_->is_neighbor_bidi( msg->dest() ) )
00322             {
00323                debug_->debug( "Delivering message directly to %d, size: %d.\n", msg->dest(), msg->buffer_size() );
00324                radio_->send( msg->dest(), msg->buffer_size(), (block_data_t*)msg );
00325 
00326                statistics_->print_statistics_comment( "direct message delivery at time:" );
00327 
00328                //Save statistics output
00329                typename Os::Clock clock;
00330                statistics_->print_statistics(
00331                   clock.seconds( clock.time() ) - starting_time_,
00332                   number_of_stored_messages_,
00333                   0
00334                );
00335                return;
00336             }
00337 
00338             /*
00339              * Put message into buffer. This is done by find the first empty slot. If no slot is empty, the oldest
00340              * message will be deleted and the new message will be put to that position.
00341              */
00342             uint16_t cur_max = 0;
00343             uint16_t cur_max_index = 0;
00344 
00345             for( i=0; i < MAX_NUMBER_OF_STORED_MESSAGES; ++i )
00346             {
00347                if( !stored_messages_[i].used )
00348                {
00349                   break;
00350                } else {
00351                   if( stored_messages_[i].store_duration > cur_max )
00352                   {
00353                      cur_max = stored_messages_[i].store_duration;
00354                      cur_max_index = i;
00355                   }
00356                }
00357             }
00358 
00359             if( i == MAX_NUMBER_OF_STORED_MESSAGES ) // All slots are used.
00360             {
00361                // Drop oldest message by the new message.
00362                i = cur_max_index;
00363                number_of_stored_messages_--;
00364 
00365                debug_->debug( "Could not deliver message from %d to %d. (buffer overflow)\n",
00366                      stored_messages_[cur_max_index].message.source(),
00367                      stored_messages_[cur_max_index].message.dest() );
00368                statistics_->print_statistics_comment( "Message buffer overflowed: deleted oldest message!" );
00369             }
00370 
00371             //Store the message in slot i
00372             stored_messages_[i].used = true;
00373             stored_messages_[i].store_duration = 0;
00374             memcpy( &stored_messages_[i].message, msg, msg->buffer_size() );
00375             number_of_stored_messages_++;
00376 
00377             debug_->debug( "number of stored messages: %d\n", number_of_stored_messages_);
00378 
00379             // clear list of roombas that got current state of message buffer
00380             for( i=0; i < MAX_NUMBER_OF_ROOMBAS; ++i )
00381             {
00382                roomba_interchanged_msg_table_[i].used = false;
00383             }
00384 
00385             //Save statistics output
00386             typename Os::Clock clock;
00387             statistics_->print_statistics(
00388                clock.seconds( clock.time() ) - starting_time_,
00389                number_of_stored_messages_,
00390                1
00391             );
00392          }
00393       }
00394 
00395       struct message_entry
00396       {
00397          bool used;
00398          uint16_t store_duration;
00399          msg_t message;
00400       };
00401 
00402       struct roomba_interchanged_msg_entry
00403       {
00404          bool used;
00405          node_id_t roomba_id;
00406       };
00407 
00408       Radio* radio_;
00409       int starting_time_;                    // starting time that shall be substracted from current event time for statistics
00410       typename Debug::self_pointer_t debug_;
00411       typename Timer::self_pointer_t timer_;
00412       typename Clock::self_pointer_t clock_;
00413       typename Roomba::self_pointer_t roomba_;
00414       NeighborDiscovery* neighbor_discovery_;
00415       RoombaStatistics* statistics_;
00416 
00417       roomba_interchanged_msg_entry roomba_interchanged_msg_table_[MAX_NUMBER_OF_ROOMBAS]; // saves for current buffer which roombas alread got buffer dump
00418       message_entry stored_messages_[MAX_NUMBER_OF_STORED_MESSAGES];
00419       uint16_t number_of_stored_messages_;
00420       int radio_recv_callback_id_;
00421       bool stopped_;
00422 };
00423 
00424 #endif //DELAYED_MESSAGE_DELIVERY
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines