Wiselib
|
00001 /*************************************************************************** 00002 ** This file is part of the generic algorithm library Wiselib. ** 00003 ** Copyright (C) 2008,2009 by the Wisebed (www.wisebed.eu) project. ** 00004 ** ** 00005 ** The Wiselib is free software: you can redistribute it and/or modify ** 00006 ** it under the terms of the GNU Lesser General Public License as ** 00007 ** published by the Free Software Foundation, either version 3 of the ** 00008 ** License, or (at your option) any later version. ** 00009 ** ** 00010 ** The Wiselib is distributed in the hope that it will be useful, ** 00011 ** but WITHOUT ANY WARRANTY; without even the implied warranty of ** 00012 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ** 00013 ** GNU Lesser General Public License for more details. ** 00014 ** ** 00015 ** You should have received a copy of the GNU Lesser General Public ** 00016 ** License along with the Wiselib. ** 00017 ** If not, see <http://www.gnu.org/licenses/>. ** 00018 ***************************************************************************/ 00019 #ifndef __ALGORITHMS_END_TO_END_C_END_TO_END_COMMUNICATION_H__H__ 00020 #define __ALGORITHMS_END_TO_END_C_END_TO_END_COMMUNICATION_H__H__ 00021 00022 #include "algorithms/e2ec/e2ec_message.h" 00023 #include "algorithms/neighbor_discovery/echo.h" 00024 #include "algorithms/neighbor_discovery/pgb_payloads_ids.h" 00025 00026 #include "algorithms/cluster/clustering_types.h" 00027 #include "util/pstl/vector_static.h" 00028 #include "util/pstl/priority_queue.h" 00029 #include "util/pstl/pair.h" 00030 #include "util/pstl/map_static_vector.h" 00031 #include "internal_interface/routing_table/routing_table_static_array.h" 00032 #include "util/delegates/delegate.hpp" 00033 00034 #include "algorithms/cluster/fronts/fronts_core.h" 00035 00036 #include "util/serialization/serialization.h" 00037 #include "util/base_classes/routing_base.h" 00038 #include "algorithms/cluster_radio/cluster_radio.h" 00039 00040 00041 //#define DEBUG_E2EC 00042 00043 #define TIMEOUT 2000 00044 00045 namespace wiselib { 00046 00050 template<typename OsModel_P, 00051 typename Radio_P, 00052 typename Neighbor_Discovery_P = wiselib::Echo<OsModel_P, Radio_P, typename OsModel_P::Timer, typename OsModel_P::Debug>, 00053 typename Cluster_P = wiselib::FrontsCore<OsModel_P, Radio_P, wiselib::AtributeClusterHeadDecision<OsModel_P, Radio_P>, wiselib::BfsJoinDecision<OsModel_P, Radio_P>, wiselib::FrontsIterator<OsModel_P, Radio_P> > > 00054 class EndToEndCommunication : public RoutingBase<OsModel_P, Radio_P> { 00055 public: 00056 00057 typedef OsModel_P OsModel; 00058 00059 typedef Radio_P Radio; 00060 typedef typename OsModel_P::Debug Debug; 00061 typedef typename OsModel_P::Clock Clock; 00062 typedef typename OsModel::Timer Timer; 00063 typedef typename OsModel::Rand Rand; 00064 typedef typename Radio::node_id_t node_id_t; 00065 typedef typename Radio::size_t size_t; 00066 typedef typename Radio::block_data_t block_data_t; 00067 typedef typename Radio::message_id_t message_id_t; 00068 typedef typename Timer::millis_t millis_t; 00069 typedef typename Clock::time_t time_t; 00070 00071 typedef Cluster_P Cluster; 00072 typedef Neighbor_Discovery_P NeighborDiscovery; 00073 typedef Neighbor_Discovery_P nb_t; 00074 00075 00076 typedef wiselib::E2ecMessage<OsModel, Radio> CommunicationMsg_t; 00077 typedef EndToEndCommunication<OsModel, Radio, NeighborDiscovery, Cluster> self_type; 00078 00079 typedef self_type* self_pointer_t; 00080 00081 typedef E2ecMessage<OsModel, Radio> E2ecMsg_t; 00082 00083 typedef typename wiselib::ClusterRadio<OsModel, Radio, nb_t, Cluster> cluster_radio_t; 00084 00085 00086 struct rrqs { 00087 node_id_t source; 00088 node_id_t dest; 00089 uint8_t seqno; 00090 time_t time; 00091 }; 00092 00093 struct route { 00094 node_id_t dest; 00095 uint8_t seqno; 00096 time_t time; 00097 uint8_t hops; 00098 node_id_t nodes[10]; 00099 }; 00100 00101 struct pending_message { 00102 E2ecMsg_t msg; 00103 time_t time; 00104 }; 00105 00106 typedef wiselib::vector_static<OsModel, struct pending_message, 10> pending_messages_t; 00107 typedef typename pending_messages_t::iterator pending_messages_it; 00108 00109 typedef wiselib::vector_static<OsModel, node_id_t, 3> robots_t; 00110 typedef typename robots_t::iterator robots_it; 00111 00112 typedef wiselib::vector_static<OsModel, struct rrqs, 10> rrq_t_t; 00113 typedef typename rrq_t_t::iterator rrq_t_it; 00114 00115 typedef wiselib::vector_static<OsModel, struct route, 5> routes_t; 00116 typedef typename routes_t::iterator routes_it; 00117 00118 // -------------------------------------------------------------------- 00119 enum SpecialNodeIds { 00120 BROADCAST_ADDRESS = Radio::BROADCAST_ADDRESS, 00121 NULL_NODE_ID = Radio::NULL_NODE_ID 00122 }; 00123 // -------------------------------------------------------------------- 00124 enum Restrictions { 00125 MESSAGE_SIZE = E2ecMsg_t::MAX_PAYLOAD_LENGTH 00126 }; 00127 00128 enum{ 00129 E2EC_MESSAGE = 111, 00130 }; 00131 00132 enum ErrorCodes{ 00133 SUCCESS = OsModel::SUCCESS, 00134 ERR_UNSPEC = OsModel::ERR_UNSPEC 00135 }; 00136 // -------------------------------------------------------------------- 00139 EndToEndCommunication() {} 00140 ~EndToEndCommunication() {} 00142 00145 void enable_radio(); 00146 void disable_radio(); 00148 00151 00153 void send(node_id_t receiver, size_t len, block_data_t *data ); 00154 void send(E2ecMsg_t *msg); 00155 int send_via_robot(E2ecMsg_t *msg); 00156 void radio_receive( node_id_t receiver, size_t len, block_data_t *data ); 00157 void arriving_robot(uint8_t event, node_id_t from, uint8_t len, uint8_t* data); 00158 bool is_robot(node_id_t address); 00159 00160 int init( Radio& , Timer& , Clock& , Debug& , Rand& , Cluster& ,NeighborDiscovery& , cluster_radio_t&); 00161 void destruct() {} 00162 00163 00164 void add_robot_id(node_id_t id) { 00165 robots.push_back(id); 00166 } 00167 00168 char *sprint_payload(E2ecMsg_t *msg) { 00169 00170 static char str[60]; 00171 int bytes_written=0; 00172 for(int i=0; i<msg->payload_size() ;i++) { 00173 bytes_written+=sprintf(str + bytes_written," %x",*(msg->payload() + i)); 00174 } 00175 str[bytes_written]='\0'; 00176 00177 return str; 00178 } 00179 00180 struct pending_message createMsg(E2ecMsg_t msg) { 00181 struct pending_message pm; 00182 pm.msg = msg; 00183 pm.time = clock().time(); 00184 00185 return pm; 00186 } 00187 00188 void addRoute(E2ecMsg_t* msg) { 00189 00190 struct route rt; 00191 rt.dest = msg->dest(); 00192 rt.hops = msg->hops(); 00193 memcpy(rt.nodes,msg->payload(),(msg->hops()+1)*sizeof(node_id_t)); 00194 rt.seqno = msg->seq_no(); 00195 rt.time = clock().time(); 00196 00197 for(routes_it 00198 it = routes.begin(); 00199 it != routes.end(); 00200 it++) { 00201 if (it->dest == msg->dest() 00202 ) { 00203 if (it->seqno > msg->seq_no()) { 00204 return; 00205 } 00206 // debug().debug("replacing with better route"); 00207 routes.erase(it); 00208 routes.push_back(rt); 00209 return; 00210 } 00211 } 00212 routes.push_back(rt); 00213 } 00214 00215 routes_it findRoute(node_id_t dest) { 00216 for(routes_it 00217 it = routes.begin(); 00218 it != routes.end(); 00219 it++) { 00220 if (it->dest == dest) { 00221 return it; 00222 } 00223 } 00224 00225 return routes.end(); 00226 } 00227 00228 int addRrq(E2ecMsg_t msg) { 00229 struct rrqs rrq; 00230 rrq.dest = msg.dest(); 00231 rrq.source = msg.source(); 00232 rrq.seqno = msg.seq_no(); 00233 rrq.time = clock().time(); 00234 00235 for(rrq_t_it 00236 it = rrq_messages.begin(); 00237 it != rrq_messages.end(); 00238 it++) { 00239 if (it->dest == msg.dest() 00240 && it->source == msg.source() 00241 ) { 00242 if (it->seqno == msg.seq_no()) { 00243 return -1; 00244 } 00245 else { 00246 return 1; 00247 } 00248 } 00249 } 00250 00251 rrq_messages.push_back(rrq); 00252 return 0; 00253 } 00254 00255 bool is_in_cluster(node_id_t dest) { 00256 //Check if the destination is within the cluster 00257 int cnodesNo = cluster().childs_count(); 00258 node_id_t cnodes[cnodesNo]; 00259 cluster().childs(cnodes); 00260 00261 for (int i=0; i < cnodesNo ; i++) { 00262 if (cnodes[i] == dest) { 00263 return true; 00264 } 00265 } 00266 return false; 00267 } 00268 00269 void cluster_radio_receive(node_id_t from, size_t len, block_data_t *data) { 00270 E2ecMsg_t* msg; 00271 msg = (E2ecMsg_t*)data; 00272 00273 //Only treat CommunicationMessages 00274 if(msg->msg_id() == E2ecMsg_t::E2EC_TYPE) { 00275 #ifdef DEBUG_E2EC 00276 debug().debug ("E2ec::cluster_radio_receive::%x::%x:: E2EC_TYPE msg(%x,%x,%d) pl[%s]",tx_radio_->id(),get_next_hop(msg), msg->source(), msg->dest(), msg->seq_no(),sprint_payload(msg)); 00277 #endif 00278 00279 if(radio().id() == msg->dest()) 00280 { 00281 notify_receivers(msg->source(), 00282 msg->payload_size() - (msg->hops()+1) * sizeof(node_id_t) , 00283 msg->payload()+ (msg->hops()+1) * sizeof(node_id_t)); 00284 00285 } 00286 else 00287 if (radio().id() == get_next_hop(msg)) { 00288 send(msg); 00289 } 00290 else { 00291 #ifdef DEBUG_E2EC 00292 debug().debug ("E2ec::send::%x::%x:: sending over cluster radio msg(%x,%x,%d) pl[%s]",tx_radio_->id(),get_next_hop(msg), msg->source(), msg->dest(), msg->seq_no(),sprint_payload(msg)); 00293 #endif 00294 clusterRadio().send(get_next_hop(msg), msg->buffer_size(), (unsigned char *)&((*msg))); 00295 } 00296 00297 } 00298 else if (msg->msg_id() == E2ecMsg_t::E2EC_RRQ_TYPE) { 00299 if ( addRrq(*msg) < 0 ) { 00300 #ifdef DEBUG_E2EC 00301 // debug().debug ("E2ec::cluster_radio_receive::%x::%x:: received E2EC_RRQ_TYPE INGORE", 00302 // tx_radio_->id(),from); 00303 #endif 00304 return; 00305 } 00306 #ifdef DEBUG_E2EC 00307 debug().debug ("E2ec::cluster_radio_receive::%x::%x:: received E2EC_RRQ_TYPE msg(%x,%x,%d)", 00308 tx_radio_->id(),from ,msg->source(), msg->dest(), msg->seq_no()); 00309 #endif 00310 00311 // memcpy(msg->payload(),(void *)(radio().id()),sizeof(node_id_t)); 00312 node_id_t myid = radio().id(); 00313 write<OsModel, block_data_t, node_id_t > (msg->payload() + msg->payload_size(), myid); 00314 msg->set_payload_size(msg->payload_size() + sizeof(node_id_t)); 00315 msg->set_hops(msg->hops()+1); 00316 00317 if(is_in_cluster(msg->dest()) || (msg->dest() == radio().id())) { 00318 msg->set_msg_id(E2ecMsg_t::E2EC_RPL_TYPE); 00319 #ifdef DEBUG_E2EC 00320 debug().debug ("E2ec::cluster_radio_receive::%x::%x:: sending E2EC_RPL_TYPE msg(%x,%x,%d)", 00321 tx_radio_->id(), get_reverse_hop(msg), 00322 read<OsModel, block_data_t, node_id_t>(msg->payload()), 00323 read<OsModel, block_data_t, node_id_t>(msg->payload() + sizeof(node_id_t) ), msg->seq_no()); 00324 #endif 00325 00326 clusterRadio().send(get_reverse_hop(msg), msg->buffer_size(), (unsigned char *)msg); 00327 } 00328 else { 00329 #ifdef DEBUG_E2EC 00330 debug().debug ("E2ec::cluster_radio_receive::%x::%x:: sending E2EC_RRQ_TYPE msg(%x,%x,%d)", 00331 tx_radio_->id(),from ,msg->source(), msg->dest(), msg->seq_no()); 00332 #endif 00333 clusterRadio().send(clusterRadio().BROADCAST_ADDRESS, msg->buffer_size(), (unsigned char *)msg); 00334 } 00335 } 00336 else if (msg->msg_id() == E2ecMsg_t::E2EC_RPL_TYPE) { 00337 #ifdef DEBUG_E2EC 00338 debug().debug ("E2ec::cluster_radio_receive::%x::%x:: received E2EC_RPL_TYPE msg(%x,%x,%d)", 00339 tx_radio_->id(),from ,msg->source(), msg->dest(), msg->seq_no()); 00340 #endif 00341 if (radio().id() == get_reverse_hop(msg)) { 00342 addRoute(msg); 00343 while(send_pending_messages(msg) == 1); 00344 } 00345 else if (radio().NULL_NODE_ID != get_reverse_hop(msg)){ 00346 #ifdef DEBUG_E2EC 00347 debug().debug ("E2ec::cluster_radio_receive::%x::%x:: sending E2EC_RPL_TYPE msg(%x,%x,%d)", 00348 tx_radio_->id(),from ,get_reverse_hop(msg), msg->dest(), msg->seq_no()); 00349 #endif 00350 clusterRadio().send(get_reverse_hop(msg), msg->buffer_size(), (unsigned char *)msg); 00351 } 00352 else { 00353 #ifdef DEBUG_E2EC 00354 debug().debug ("E2ec::cluster_radio_receive::%x::%x:: INVALID NEXT HOP DROPING E2EC_RPL_TYPE msg(%x,%x,%d)", 00355 tx_radio_->id(),from ,get_reverse_hop(msg), msg->dest(), msg->seq_no()); 00356 #endif 00357 00358 } 00359 } 00360 } 00361 00362 int send_pending_messages(E2ecMsg_t* msg) { 00363 //clusterRadio().present_neighbors(); 00364 //return; 00365 for (pending_messages_it 00366 it = pending_messages.begin(); 00367 it != pending_messages.end(); 00368 it++) { 00369 00370 if ((it->msg).dest() == msg->dest()) { 00371 E2ecMsg_t data_to_send((it->msg).source(),(it->msg).dest(),(it->msg).seq_no()); 00372 00373 data_to_send.set_hops(msg->hops()); 00374 data_to_send.set_msg_id(E2ecMsg_t::E2EC_TYPE); 00375 // msg->set_msg_id(E2ecMsg_t::E2EC_TYPE); 00376 00377 memcpy(data_to_send.payload(), 00378 msg->payload(), ((msg->hops() + 1) * sizeof(node_id_t))); 00379 // data_to_send.set_payload(((msg->hops() + 1) * sizeof(node_id_t)),msg->payload()); 00380 data_to_send.set_payload_size(data_to_send.payload_size() + ((msg->hops() + 1) * sizeof(node_id_t))); 00381 00382 memcpy(data_to_send.payload() + data_to_send.payload_size(), 00383 (it->msg).payload(), (it->msg).payload_size()); 00384 data_to_send.set_payload_size(data_to_send.payload_size() + (it->msg).payload_size()); 00385 00386 // memcpy(msg->payload() + ((msg->hops() + 1) * sizeof(node_id_t)), (it->msg).payload(), (it->msg).payload_size()); 00387 // msg->set_payload_size(msg->payload_size() + (it->msg).payload_size()); 00388 00389 #ifdef DEBUG_E2EC 00390 debug().debug ("E2ec::send::%x::%x:: sending over cluster radio msg(%x,%d,%d) pl[%s]", 00391 tx_radio_->id(),get_next_hop(&data_to_send),data_to_send.dest(),data_to_send.msg_id(),data_to_send.payload_size(), 00392 sprint_payload(&data_to_send)); 00393 #endif 00394 00395 // clusterRadio().present_neighbors(); 00396 00397 //clusterRadio().send(get_next_hop(msg), msg->buffer_size(), (unsigned char *)msg); 00398 clusterRadio().send(get_next_hop(&data_to_send), data_to_send.buffer_size(), (unsigned char *)&data_to_send); 00399 00400 // msg->set_payload_size(msg->payload_size() - (it->msg).payload_size()) 00401 pending_messages.erase(it); 00402 return 1; 00403 } 00404 } 00405 00406 return 0; 00407 // debug().debug("sending;;;;;;;;;"); 00408 } 00409 00410 node_id_t get_reverse_hop(E2ecMsg_t* msg) { 00411 for (int i=1; i<=msg->hops(); i++) { 00412 if (read<OsModel, block_data_t, node_id_t>(msg->payload() + sizeof(node_id_t)*i) == radio().id()) { 00413 return read<OsModel, block_data_t, node_id_t>(msg->payload() + sizeof(node_id_t)*(i-1)); 00414 } 00415 } 00416 00417 if (read<OsModel, block_data_t, node_id_t>(msg->payload()) == radio().id()) { 00418 return radio().id(); 00419 } 00420 else { 00421 return clusterRadio().NULL_NODE_ID; 00422 } 00423 } 00424 00425 node_id_t get_next_hop(E2ecMsg_t* msg) { 00426 for (int i=0; i<msg->hops(); i++) { 00427 if (read<OsModel, block_data_t, node_id_t>(msg->payload() + sizeof(node_id_t)*i) == radio().id()) { 00428 return read<OsModel, block_data_t, node_id_t>(msg->payload() + sizeof(node_id_t)*(i+1)); 00429 } 00430 } 00431 00432 if (read<OsModel, block_data_t, node_id_t>(msg->payload() + msg->hops()*sizeof(node_id_t)) == radio().id()) { 00433 return radio().id(); 00434 } 00435 else { 00436 return clusterRadio().NULL_NODE_ID; 00437 } 00438 00439 } 00440 00441 void send_rrq(E2ecMsg_t msg) { 00442 00443 E2ecMsg_t rrq_msg(msg.source(),msg.dest(), msg.seq_no()); 00444 rrq_msg.set_msg_id(E2ecMsg_t::E2EC_RRQ_TYPE); 00445 node_id_t myid = radio().id(); 00446 // memcpy(rrq_msg.payload(), (void *)&(myid), sizeof(node_id_t)); 00447 write<OsModel, block_data_t, node_id_t > (rrq_msg.payload(), myid); 00448 rrq_msg.set_payload_size(rrq_msg.payload_size() + sizeof(node_id_t)); 00449 00450 #ifdef DEBUG_E2EC 00451 debug().debug ("E2ec::sendRrq::%x::%x:: sending over cluster radio msg(%x,%x,%d) pl[%s]", 00452 tx_radio_->id(), cluster_radio_t::BROADCAST_ADDRESS, 00453 rrq_msg.source(), rrq_msg.dest(), rrq_msg.seq_no() 00454 ,sprint_payload(&rrq_msg)); 00455 // clusterRadio().present_neighbors(); 00456 #endif 00457 clusterRadio().send(cluster_radio_t::BROADCAST_ADDRESS, rrq_msg.buffer_size(), (unsigned char *)(&rrq_msg)); 00458 } 00459 00460 00461 void cleanup_pending_messages() { 00462 for (pending_messages_it 00463 it = pending_messages.begin(); 00464 it != pending_messages.end(); 00465 it++) { 00466 if ( (clock().seconds(clock().time()) - clock().seconds(it->time)) > 5) { 00467 #ifdef DEBUG_E2EC 00468 debug().debug("e2ec::cleanup::%x removing pending data message (%x,%x,%d)", radio().id(), it->msg.source(), it->msg.dest(), it->msg.seq_no()); 00469 #endif 00470 pending_messages.erase(it); 00471 cleanup_pending_messages(); 00472 return; 00473 } 00474 // else { 00475 // if (send_via_robot( &(it->msg) ) == 0) { 00476 // pending_messages.erase(it); 00477 // cleanup_pending_messages(); 00478 // return; 00479 // } 00480 // } 00481 } 00482 } 00483 00484 void cleanup_stale_rrq() { 00485 for (rrq_t_it 00486 it = rrq_messages.begin(); 00487 it != rrq_messages.end(); 00488 it++) { 00489 if ( (clock().seconds(clock().time()) - clock().seconds(it->time)) > 1) { 00490 #ifdef DEBUG_E2EC 00491 debug().debug("e2ec::cleanup::%x removing pending rrq message (%x,%x,%d)", radio().id(), it->source, it->dest, it->seqno); 00492 #endif 00493 // rrq_messages.erase(it,it); 00494 rrq_messages.erase(it); 00495 cleanup_stale_rrq(); 00496 return; 00497 } 00498 } 00499 } 00500 00501 00502 void update_routes(void *a) { 00503 for (pending_messages_it 00504 it = pending_messages.begin(); 00505 it != pending_messages.end(); 00506 it++) { 00507 if (addRrq(it->msg) != 1 && (findRoute((it->msg).dest())==routes.end()) ) { 00508 send_rrq(it->msg); 00509 } 00510 } 00511 00512 for (routes_it 00513 it = routes.begin(); 00514 it != routes.end(); 00515 it++) { 00516 send_rrq(E2ecMsg_t(radio().id(), it->dest, (it->seqno)+1)); 00517 } 00518 00519 if (pending_messages.size() == 0 00520 && (empty_q == false) ) { 00521 debug().debug("E2EP;%d", pending_messages.size()); 00522 empty_q = true; 00523 } 00524 00525 timer().template set_timer<EndToEndCommunication,&EndToEndCommunication::update_routes>( 5000, this, 0 ); 00526 } 00527 00528 void cleanup(void *a) { 00529 00530 00531 cleanup_pending_messages(); 00532 00533 cleanup_stale_rrq(); 00534 00535 00536 if (pending_messages.size() > 0) { 00537 empty_q = false; 00538 debug().debug("E2EP;%d", pending_messages.size()); 00539 } 00540 00541 timer().template set_timer<EndToEndCommunication,&EndToEndCommunication::cleanup>( 1000, this, 0 ); 00542 } 00543 00544 protected: 00545 typename Radio::self_pointer_t tx_radio_; 00546 typename Debug::self_pointer_t debug_; 00547 typename Clock::self_pointer_t clock_; 00548 typename Timer::self_pointer_t timer_; 00549 typename Cluster::self_type* cluster_; 00550 typename Rand::self_pointer_t rand_; 00551 00552 cluster_radio_t *ClusterRadio_; 00553 nb_t *neighbor_; 00554 00555 pending_messages_t pending_messages; 00556 robots_t robots; 00557 rrq_t_t rrq_messages; 00558 routes_t routes; 00559 00560 millis_t disconnected_node_timeout_; 00561 00562 int radio_recv_callback_id_; 00563 00564 uint8_t cur_seq_no; 00565 00566 uint32_t msg_received; 00567 bool empty_q; 00568 00569 Radio& radio() 00570 { 00571 return *tx_radio_; 00572 } 00573 Timer& timer() 00574 { 00575 return *timer_; 00576 } 00577 Debug& debug() 00578 { 00579 #ifdef SHAWN 00580 debug_->debug("\n"); 00581 #endif 00582 return *debug_; 00583 } 00584 00585 Cluster& cluster() 00586 { 00587 return *cluster_; 00588 } 00589 00590 Clock& clock() 00591 { 00592 return *clock_; 00593 } 00594 00595 nb_t& neighbor_discovery(){ 00596 return *neighbor_; 00597 } 00598 00599 cluster_radio_t& clusterRadio() { 00600 return *ClusterRadio_; 00601 } 00602 00603 }; 00604 00605 // ----------------------------------------------------------------------- 00606 00607 template<typename OsModel_P, 00608 typename Radio_P, 00609 typename Neighbor_Discovery_P, 00610 typename Cluster_P> 00611 int 00612 EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P,Cluster_P>:: 00613 init (Radio& tx_radio, Timer& timer, Clock& clock, Debug& debug, Rand& rand, Cluster& cluster, NeighborDiscovery& neighbor, cluster_radio_t& clusterRadio) { 00614 tx_radio_ = &tx_radio; 00615 timer_ = &timer; 00616 clock_ = &clock; 00617 rand_ = &rand; 00618 neighbor_ = &neighbor; 00619 debug_ = &debug; 00620 cluster_= &cluster; 00621 ClusterRadio_ = &clusterRadio; 00622 00623 cur_seq_no = 0; 00624 msg_received = 0; 00625 00626 #ifdef DEBUG_E2EC 00627 // debug().debug("E2Ec Algorithm: initialized"); 00628 #endif 00629 return SUCCESS; 00630 } 00631 00632 00633 template<typename OsModel_P, 00634 typename Radio_P, 00635 typename Neighbor_Discovery_P, 00636 typename Cluster_P> 00637 void 00638 EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P, Cluster_P>:: 00639 enable_radio() 00640 { 00641 empty_q = true; 00642 #ifdef DEBUG_E2EC 00643 debug().debug("E2Ec::enable_radio %x", tx_radio_->id()); 00644 #endif 00645 radio().enable_radio(); 00646 radio_recv_callback_id_ = radio().template reg_recv_callback<self_type, &self_type::radio_receive >(this); 00647 00648 clusterRadio().template reg_recv_callback<self_type, &self_type::cluster_radio_receive >(this); 00649 00650 // neighbor_discovery().register_payload_space( MOBILITY ); 00651 00652 uint8_t flags = nb_t::NEW_NB_BIDI|nb_t::DROPPED_NB; 00653 neighbor_discovery().template reg_event_callback<self_type, &self_type::arriving_robot>( MOBILITY, flags, this ); 00654 00655 timer().template set_timer<EndToEndCommunication,&EndToEndCommunication::cleanup>( 5000, this, 0 ); 00656 timer().template set_timer<EndToEndCommunication,&EndToEndCommunication::update_routes>( 5000, this, 0 ); 00657 } 00658 00659 // ----------------------------------------------------------------------- 00660 00661 template<typename OsModel_P, 00662 typename Radio_P, 00663 typename Neighbor_Discovery_P, 00664 typename Cluster_P> 00665 void 00666 EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P, Cluster_P>:: 00667 disable_radio(void) { 00668 #ifdef DEBUG_E2EC 00669 debug().debug("E2Ec::disable"); 00670 #endif 00671 //Unregister callbacks 00672 // neighbor_discovery().unregister_payload_space( MOBILITY ); 00673 // neighbor_discovery().unreg_recv_callback( MOBILITY ); 00674 radio().unreg_recv_callback( radio_recv_callback_id_ ); 00675 } 00676 00677 // ----------------------------------------------------------------------- 00678 00679 template<typename OsModel_P, 00680 typename Radio_P, 00681 typename Neighbor_Discovery_P, 00682 typename Cluster_P> 00683 void 00684 EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P, Cluster_P>:: 00685 send(node_id_t destination, size_t len, block_data_t *data) { 00686 00687 00688 cur_seq_no++; 00689 E2ecMsg_t msg(radio().id(), destination, cur_seq_no); 00690 msg.set_payload(len,data); 00691 send(&msg); 00692 } 00693 00694 template<typename OsModel_P, 00695 typename Radio_P, 00696 typename Neighbor_Discovery_P, 00697 typename Cluster_P> 00698 void 00699 EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P, Cluster_P>:: 00700 send(E2ecMsg_t *msg) { 00701 00702 if (neighbor_discovery().is_neighbor_bidi(msg->dest())) { 00703 00704 debug().debug ("E2E;%x;%d;%x", 00705 tx_radio_->id(), msg->msg_id(), msg->dest()); 00706 00707 #ifdef DEBUG_E2EC 00708 debug().debug ("E2ec::send::from%x::type%d::to%x:: sending to final destination msg(%x,%x,%d)", 00709 tx_radio_->id(), msg->msg_id(), msg->dest(), msg->source(), msg->dest(), msg->seq_no()); 00710 #endif 00711 radio().send(msg->dest(), msg->buffer_size(), (unsigned char *)msg); 00712 } 00713 else if (cluster().is_cluster_head()) { 00714 //Check if the destination is within the cluster 00715 int cnodesNo = cluster().childs_count(); 00716 node_id_t cnodes[cnodesNo]; 00717 cluster().childs(cnodes); 00718 00719 bool is_in_cluster = false; 00720 for (int i=0; i < cnodesNo ; i++) { 00721 if (cnodes[i] == msg->dest()) { 00722 is_in_cluster = true; 00723 } 00724 } 00725 00726 if (is_in_cluster) { 00727 00728 if ( cluster().get_next_node_to_child(msg->dest()) != radio().NULL_NODE_ID ) { 00729 debug().debug ("E2E;%x;%d;%x", 00730 tx_radio_->id(), msg->msg_id(), cluster().get_next_node_to_child(msg->dest())); 00731 #ifdef DEBUG_E2EC 00732 debug().debug ("E2ec::send::%x::%x:: sending in to cluster msg(%x,%x,%d) pl[%s]", 00733 tx_radio_->id(),cluster().get_next_node_to_child(msg->dest()), 00734 msg->source(),msg->dest(),msg->payload_size(), sprint_payload(msg)); 00735 #endif 00736 radio().send(cluster().get_next_node_to_child(msg->dest()), msg->buffer_size(), (unsigned char *)msg); 00737 } 00738 else 00739 { 00740 debug().debug ("E2E droping (not in cluster);%x;%d;%x", 00741 tx_radio_->id(), msg->msg_id(), cluster().get_next_node_to_child(msg->dest())); 00742 } 00743 00744 /* radio().send(msg->dest(), msg->buffer_size(), (unsigned char *)msg);*/ 00745 } 00746 else{ 00747 00748 routes_it it = findRoute(msg->dest()); 00749 if ( it == routes.end()) { 00750 pending_messages.push_back(createMsg(*msg)); 00751 if (addRrq(*msg) == 1 ){ 00752 return; 00753 } 00754 00755 E2ecMsg_t rrq_msg(msg->source(),msg->dest(), msg->seq_no()); 00756 rrq_msg.set_msg_id(E2ecMsg_t::E2EC_RRQ_TYPE); 00757 node_id_t myid = radio().id(); 00758 // memcpy(rrq_msg.payload(), (void *)&(myid), sizeof(node_id_t)); 00759 write<OsModel, block_data_t, node_id_t > (rrq_msg.payload(), myid); 00760 rrq_msg.set_payload_size(rrq_msg.payload_size() + sizeof(node_id_t)); 00761 00762 #ifdef DEBUG_E2EC 00763 debug().debug ("E2ec::sendRrq::%x::%x:: sending over cluster radio msg(%x,%x,%d) pl[%s]", 00764 tx_radio_->id(), cluster_radio_t::BROADCAST_ADDRESS, 00765 rrq_msg.source(), rrq_msg.dest(), rrq_msg.seq_no() 00766 ,sprint_payload(&rrq_msg)); 00767 // clusterRadio().present_neighbors(); 00768 #endif 00769 clusterRadio().send(cluster_radio_t::BROADCAST_ADDRESS, rrq_msg.buffer_size(), (unsigned char *)(&rrq_msg)); 00770 // send_via_robot(msg); 00771 // send_to_neighbor_clusters(); 00772 } 00773 else 00774 { 00775 E2ecMsg_t data_to_send(msg->source(),msg->dest(), msg->seq_no()); 00776 00777 data_to_send.set_hops(it->hops); 00778 data_to_send.set_msg_id(E2ecMsg_t::E2EC_TYPE); 00779 00780 memcpy(data_to_send.payload(), 00781 it->nodes, ((it->hops + 1) * sizeof(node_id_t))); 00782 data_to_send.set_payload_size(data_to_send.payload_size() + ((it->hops + 1) * sizeof(node_id_t))); 00783 00784 memcpy(data_to_send.payload() + data_to_send.payload_size(), 00785 msg->payload(), msg->payload_size()); 00786 data_to_send.set_payload_size(data_to_send.payload_size() + msg->payload_size()); 00787 00788 #ifdef DEBUG_E2EC 00789 debug().debug ("E2ec::send::%x::%x:: sending over cluster radio Route Cache Hit msg(%x,%d,%d) pl[%s]", 00790 tx_radio_->id(),get_next_hop(&data_to_send),data_to_send.dest(),data_to_send.msg_id(),data_to_send.payload_size(), 00791 sprint_payload(&data_to_send)); 00792 #endif 00793 00794 clusterRadio().send(get_next_hop(&data_to_send), data_to_send.buffer_size(), (unsigned char *)&data_to_send); 00795 00796 00797 } 00798 00799 } 00800 00801 //Send the message to the neighboring clusters 00802 // radio().send(cluster().parent(), msg.buffer_size(), &msg); 00803 } 00804 else { 00805 00806 if ( cluster().get_next_node_to_child(msg->dest()) != radio().NULL_NODE_ID ) { 00807 debug().debug ("E2E;%x;%d;%x", 00808 tx_radio_->id(), msg->msg_id(), cluster().get_next_node_to_child(msg->dest())); 00809 00810 #ifdef DEBUG_E2EC 00811 debug().debug ("E2ec::send::%x::%x:: sending to child msg(%x,%x,%d)",tx_radio_->id(),cluster().get_next_node_to_child(msg->dest()),msg->source(),msg->dest(),msg->seq_no()); 00812 #endif 00813 radio().send(cluster().get_next_node_to_child(msg->dest()), msg->buffer_size(), (unsigned char *)msg); 00814 } 00815 else if ( (cluster().parent() != radio().NULL_NODE_ID) 00816 && (cluster().parent() != 0xffff)) { 00817 debug().debug ("E2E;%x;%d;%x", 00818 tx_radio_->id(), msg->msg_id(), cluster().parent()); 00819 00820 #ifdef DEBUG_E2EC 00821 debug().debug ("E2ec::send::%x::%x:: sending to cluster leader msg(%x,%x,%d)",tx_radio_->id(),cluster().parent(),msg->source(),msg->dest(),msg->seq_no()); 00822 #endif 00823 radio().send(cluster().parent(), msg->buffer_size(), (unsigned char *)msg); 00824 } 00825 else { 00826 #ifdef DEBUG_E2EC 00827 debug().debug ("E2ec::send::%x:: INGORING (possible cluster is not formed) msg(%x,%x,%d)",tx_radio_->id(),msg->source(),msg->dest(),msg->seq_no()); 00828 #endif 00829 00830 } 00831 } 00832 } 00833 00834 template<typename OsModel_P, 00835 typename Radio_P, 00836 typename Neighbor_Discovery_P, 00837 typename Cluster_P> 00838 void 00839 EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P, Cluster_P>:: 00840 radio_receive( node_id_t from, size_t len, block_data_t *data ) { 00841 00842 // message_id_t msg_id = read<OsModel, block_data_t, message_id_t>( data ); 00843 00844 E2ecMsg_t* msg = (E2ecMsg_t*)data; 00845 00846 //Only treat CommunicationMessages 00847 if( msg->msg_id() == E2ecMsg_t::E2EC_TYPE ) 00848 { 00849 00850 if (radio().id() == msg->dest()) { 00851 msg_received++; 00852 #ifdef DEBUG_E2EC 00853 debug().debug ("E2ec::radio_receive::%x::%x:: received message msg(%x,%x,%d) (total received: %d) payload [%s]",tx_radio_->id(),msg->source(),msg->dest(),msg->dest(),msg->seq_no(),msg_received,sprint_payload(msg)); 00854 #endif 00855 00856 notify_receivers(msg->source(), msg->payload_size() - (msg->hops()+1) * sizeof(node_id_t) , msg->payload()+ (msg->hops()+1) * sizeof(node_id_t)); 00857 if (is_robot(from)) { 00858 } 00859 return; 00860 } 00861 00862 send(msg); 00863 // send(msg->dest(), msg->buffer_size(), (unsigned char *)msg); 00864 } 00865 00866 } 00867 00868 template<typename OsModel_P, 00869 typename Radio_P, 00870 typename Neighbor_Discovery_P, 00871 typename Cluster_P> 00872 int 00873 EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P, Cluster_P>:: 00874 send_via_robot(E2ecMsg_t *msg) { 00875 for (robots_it 00876 it = robots.begin(); 00877 it != robots.end(); 00878 it++) 00879 { 00880 if (neighbor_discovery().is_neighbor(*it)) 00881 { 00882 00883 radio().send(*it, msg->buffer_size(), (unsigned char *)msg); 00884 return 0; 00885 } 00886 } 00887 return 1; 00888 } 00889 00890 template<typename OsModel_P, 00891 typename Radio_P, 00892 typename Neighbor_Discovery_P, 00893 typename Cluster_P> 00894 bool 00895 EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P, Cluster_P>:: 00896 is_robot(node_id_t id) { 00897 00898 for (robots_it 00899 it = robots.begin(); 00900 it != robots.end(); 00901 it++) { 00902 if (id == *it) { 00903 return true; 00904 } 00905 } 00906 return false; 00907 } 00908 00909 template<typename OsModel_P, 00910 typename Radio_P, 00911 typename Neighbor_Discovery_P, 00912 typename Cluster_P> 00913 void 00914 EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P, Cluster_P>:: 00915 arriving_robot(uint8_t event, node_id_t from, uint8_t len, uint8_t* data) 00916 { 00917 00918 if ( nb_t::NEW_PAYLOAD == event ) { 00919 00920 } 00921 else if ( nb_t::NEW_PAYLOAD_BIDI == event ) { 00922 } 00923 /* 00924 * +====+====+====+====++====+====++====+====++====+====+ 00925 * CMD NODE TIME TIME NODE NODE SNBH SNBH 00926 */ 00927 else if ( nb_t::NEW_NB == event ) { 00928 } 00929 else if ( nb_t::NEW_NB_BIDI == event ) { 00930 if (is_robot(from)) { 00931 for (pending_messages_it 00932 it = pending_messages.begin(); 00933 it != pending_messages.end(); 00934 it++) { 00935 if ( (clock().seconds(clock().time()) - clock().seconds(it->time)) > 1) { 00936 radio().send(from, it->msg.buffer_size(), (unsigned char *)&((*it).msg)); 00937 // debug().debug ("arriving_robot::%x::%x:: sending to robot msg(%x,%x,%d)",tx_radio_->id(),it->msg.dest(),it->msg.source(),it->msg.dest(),it->msg.seq_no()); 00938 #ifdef DEBUG_E2EC 00939 debug().debug ("E2ec::arriving_robot::%x::%x:: sending to robot msg(%x,%x,%d)",tx_radio_->id(),it->msg.dest(),it->msg.source(),it->msg.dest(),it->msg.seq_no()); 00940 #endif 00941 } 00942 } 00943 pending_messages.clear(); 00944 } 00945 } 00946 else if ( nb_t::DROPPED_NB == event ) { 00947 } 00948 else if ( nb_t::LOST_NB_BIDI == event ) { 00949 } 00950 00951 } 00952 00953 } 00954 00955 00956 #endif