Wiselib
|
00001 #ifndef __LCA_H_ 00002 #define __LCA_H_ 00003 00004 #include "util/delegates/delegate.hpp" 00005 #include "algorithms/cluster/clustering_types.h" 00006 #include "util/base_classes/clustering_base.h" 00007 #include "algorithms/neighbor_discovery/echo.h" 00008 #include "algorithms/neighbor_discovery/pgb_payloads_ids.h" 00009 #include "algorithms/cluster/join_message.h" 00010 #include "algorithms/cluster/join_accept_message.h" 00011 #include "algorithms/cluster/resume_message.h" 00012 #include "algorithms/cluster/reform_message.h" 00013 00014 #undef DEBUG 00015 // Uncomment to enable Debug 00016 #define DEBUG 00017 #ifdef DEBUG 00018 //#define DEBUG_PAYLOADS 00019 #endif 00020 00021 namespace wiselib { 00029 template<typename OsModel_P, typename Radio_P, typename HeadDecision_P, 00030 typename JoinDecision_P, typename Iterator_P> 00031 class LcaCore: public ClusteringBase<OsModel_P> { 00032 public: 00033 //TYPEDEFS 00034 typedef OsModel_P OsModel; 00035 // os modules 00036 typedef Radio_P Radio; 00037 typedef typename OsModel::Timer Timer; 00038 typedef typename OsModel::Debug Debug; 00039 typedef typename OsModel::Rand Rand; 00040 //algorithm modules 00041 typedef HeadDecision_P HeadDecision_t; 00042 typedef JoinDecision_P JoinDecision_t; 00043 typedef Iterator_P Iterator_t; 00044 // self_type 00045 typedef LcaCore<OsModel_P, Radio_P, HeadDecision_P, JoinDecision_P, 00046 Iterator_P> self_type; 00047 typedef wiselib::Echo<OsModel, Radio, Timer, Debug> nb_t; 00048 00049 // data types 00050 typedef int cluster_level_t; //quite useless within current scheme, supported for compatibility issues 00051 typedef typename Radio::node_id_t node_id_t; 00052 typedef typename Radio::size_t size_t; 00053 typedef typename Radio::block_data_t block_data_t; 00054 typedef node_id_t cluster_id_t; 00055 00056 /* 00057 * Constructor 00058 * */ 00059 LcaCore() : 00060 probability_(30), maxhops_(4), enabled_(false), status_(0), round_(0), 00061 auto_reform_(0), reform_(false), head_lost_(false) { 00062 } 00063 00064 /* 00065 * Destructor 00066 * */ 00067 ~LcaCore() { 00068 } 00069 00070 /* 00071 * INIT 00072 * initializes the values of radio timer and debug 00073 */ 00074 inline void init(Radio& radio, Timer& timer, Debug& debug, Rand& rand, 00075 nb_t& neighbor_discovery) { 00076 radio_ = &radio; 00077 timer_ = &timer; 00078 debug_ = &debug; 00079 rand_ = &rand; 00080 neighbor_discovery_ = &neighbor_discovery; 00081 uint8_t flags = nb_t::DROPPED_NB | nb_t::LOST_NB_BIDI 00082 | nb_t::NEW_PAYLOAD_BIDI | nb_t::NB_READY; 00083 00084 neighbor_discovery_->template reg_event_callback<self_type, 00085 &self_type::neighbor_discovery_callback> (CLUSTERING, flags, 00086 this); 00087 neighbor_discovery_->register_payload_space((uint8_t) CLUSTERING); 00088 } 00089 00090 // Set IT 00091 void set_iterator(Iterator_t &it) { 00092 it_ = ⁢ 00093 } 00094 00095 // Set JD 00096 void set_join_decision(JoinDecision_t &jd) { 00097 jd_ = &jd; 00098 } 00099 00100 // Set CHD 00101 void set_cluster_head_decision(HeadDecision_t &chd) { 00102 chd_ = &chd; 00103 } 00104 00105 // Set the CHD probability 00106 void set_probability(int prob) { 00107 probability_ = prob; 00108 } 00109 00110 // Set the JD maximum hops from CH 00111 void set_maxhops(int maxhops) { 00112 maxhops_ = maxhops; 00113 } 00114 00115 node_id_t parent() { 00116 return it().parent(); 00117 } 00118 00119 cluster_id_t cluster_id() { 00120 return it().cluster_id(); 00121 } 00122 00123 int node_type() { 00124 return it().node_type(); 00125 } 00126 00127 int hops() { 00128 return it().hops(); 00129 } 00130 00131 /* 00132 * The status Of the Clustering Algorithm 00133 * 1 means a cluster is being formed 00134 * 0 means cluster is formed 00135 */ 00136 inline uint8_t status() { 00137 //1 - forming 00138 //0 - formed 00139 if (enabled_) { 00140 return status_; 00141 } else { 00142 return 2; 00143 } 00144 } 00145 00146 /* 00147 * Returns the Cluster head 00148 * status of the node 00149 */ 00150 inline bool is_cluster_head() { 00151 if (node_type() != UNCLUSTERED) { 00152 return cluster_id() == radio().id(); 00153 } else { 00154 return false; 00155 } 00156 } 00157 00158 /* SHOW all known nodes */ 00159 00160 inline void present_neighbors(void) { 00161 if (!status()) { 00162 it().present_neighbors(); 00163 } 00164 } 00165 00166 /* 00167 * Enable 00168 * enables the mbfsclustering module 00169 * enable chd it and jd modules 00170 * initializes values 00171 * registers callbacks 00172 * calls find head to start clustering 00173 * */ 00174 void enable() { 00175 if (enabled_) 00176 return; 00177 enabled_ = true; 00178 //set clustering round to 0 00179 round_ = 0; 00180 rand().srand(radio().id()); 00181 00182 //initialize the clustering modules 00183 chd().init(radio(), debug(), rand()); 00184 jd().init(radio(), debug()); 00185 it().init(radio(), timer(), debug()); 00186 00187 #ifdef DEBUG 00188 debug().debug("Enable::%x::%d::%d::", radio().id(), probability_, 00189 maxhops_); 00190 #endif 00191 00192 // receive receive callback 00193 receive_callback_id_ = radio().template reg_recv_callback<self_type, 00194 &self_type::receive> (this); 00195 // set variables of other modules 00196 chd().set_probability(probability_); 00197 jd().set_maxhops(maxhops_); 00198 00199 timer().template set_timer<self_type, &self_type::form_cluster> ( 00200 time_slice_, this, (void *) maxhops_); 00201 } 00202 00203 /* 00204 * Disable 00205 * disables the bfsclustering module 00206 * unregisters callbacks 00207 * */ 00208 void disable() { 00209 // Unregister the callback 00210 radio().unreg_recv_callback(receive_callback_id_); 00211 } 00212 00213 // Call with a timer to start a reform procedure from the cluster head 00214 00215 inline void reform_cluster(void * parameter) { 00216 reform_ = true; 00217 } 00218 00219 // Start the procedure needed to form a cluster 00220 00221 inline void form_cluster(void * parameter) { 00222 if (!enabled_) 00223 return; 00224 00225 status_ = 1; 00226 00227 //enabling 00228 chd().reset(); 00229 jd().reset(); 00230 it().reset(); 00231 00232 // start the procedure to find new head 00233 //find_head(0); 00234 timer().template set_timer<self_type, &self_type::find_head> ( 00235 time_slice_, this, (void *) 0); 00236 // reform is false as cluster is not yet formed 00237 reform_ = false; 00238 } 00239 00240 /* 00241 * FIND_HEAD 00242 * starts clustering 00243 * decides a head and then start clustering 00244 * from the head of each cluster 00245 * */ 00246 void find_head(void *) { 00247 if (chd().calculate_head() == true) { 00248 // set values for iterator and join_decision 00249 it().set_parent(radio().id()); 00250 it().set_cluster_id(radio().id()); 00251 jd().set_cluster_id(radio().id()); 00252 chd().set_probability(probability_); 00253 00254 // inform for state change 00255 this->state_changed(CLUSTER_HEAD_CHANGED); 00256 00257 if (auto_reform_ > 0) { 00258 timer().template set_timer<self_type, 00259 &self_type::reform_cluster> ( 00260 auto_reform_ * time_slice_, this, (void *) maxhops_); 00261 } 00262 status_ = 0; 00263 00264 JoinClusterMsg<OsModel, Radio> msg = 00265 jd().get_join_request_payload(); 00266 // send JOIN 00267 radio().send(Radio::BROADCAST_ADDRESS, msg.length(), 00268 (block_data_t *) &msg); 00269 #ifdef DEBUG 00270 debug().debug("Send::%x::%d::%x", radio().id(), msg.msg_id(), 00271 Radio::BROADCAST_ADDRESS); 00272 #endif 00273 00274 //Check after some time if Any accept messages were received 00275 //2*time_slice for messages to be sent and received 00276 timer().template set_timer<self_type, &self_type::timer_expired> (2 00277 * maxhops_ * time_slice_, this, 0); 00278 } else { 00279 timer().template set_timer<self_type, &self_type::wait_for_joins> ( 00280 maxhops_ * time_slice_, this, 0); 00281 } 00282 } 00283 00284 void wait_for_joins(void * data) { 00285 head_lost_ = false; 00286 00287 // if noone aroung as cluster head 00288 // become a cluster head and search for nodes 00289 if (it().node_type() == UNCLUSTERED) { 00290 #ifdef DEBUG 00291 debug().debug("Not clustered yet, Start own Cluster %x", 00292 radio().id()); 00293 #endif 00294 //become a cluster head - set probability to 100% 00295 //chd().set_probability(100); 00296 // start clustering 00297 find_head(0); 00298 } else { 00299 if (jd().hops() < maxhops_) { 00300 JoinClusterMsg<OsModel, Radio> msg = 00301 jd().get_join_request_payload(); 00302 // Forward message from previous node 00303 radio().send(Radio::BROADCAST_ADDRESS, msg.length(), 00304 (block_data_t *) &msg); 00305 #ifdef DEBUG 00306 debug().debug("Send::%x::%d::%x::%d::", radio().id(), 00307 msg.msg_id(), Radio::BROADCAST_ADDRESS, msg.length()); 00308 #endif 00309 00310 //set the timer to check for clustering end 00311 timer().template set_timer<self_type, &self_type::timer_expired> ( 00312 time_slice_, this, (void*) 0); 00313 } else { 00314 // if no more hops to propagate the join message finalize cluster formation 00315 timer_expired(0); 00316 } 00317 //notify for join 00318 this->state_changed(NODE_JOINED); 00319 } 00320 } 00321 00322 /* 00323 * TIMER_EXPIRED 00324 * if timer_expired is called and no 00325 * join messages were received by non CH nodes 00326 * node becomes a CH and starts its ow cluster 00327 * */ 00328 inline void timer_expired(void * timer_value) { 00329 // if none joind under the node 00330 if (!it().any_joined()) { 00331 // if not a cluster head 00332 if (it().node_type() != HEAD) { 00333 // create a resume message 00334 ResumeClusterMsg<OsModel, Radio> msg = 00335 it().get_resume_payload(); 00336 //do send the message 00337 radio().send(it().parent(), msg.length(), (block_data_t *) &msg); 00338 00339 #ifdef DEBUG 00340 debug().debug("Send::%x::%d::%x::", radio().id(), msg.msg_id(), 00341 it().parent()); 00342 #endif 00343 }// if a cluster head end the clustering under this branch 00344 else { 00345 this->state_changed(CLUSTER_FORMED); 00346 } 00347 status_ = 0; 00348 } 00349 } 00350 00351 protected: 00352 00353 void neighbor_discovery_callback(uint8_t event, node_id_t from, 00354 uint8_t len, uint8_t* data) { 00355 if (nb_t::NEW_PAYLOAD_BIDI == event) { 00356 receive_beacon(from, len, data); 00357 //reset my beacon according to the new status 00358 uint8_t buf[beacon_size()]; 00359 get_beacon(buf); 00360 if (neighbor_discovery_->set_payload((uint8_t) CLUSTERING, buf, 00361 beacon_size()) != 0) { 00362 #ifdef DEBUG 00363 debug_->debug("Error::%x::", radio_->id()); 00364 #endif 00365 } 00366 } else if ((nb_t::LOST_NB_BIDI == event) || (nb_t::DROPPED_NB == event)) { 00367 node_lost(from); 00368 #ifdef DEBUG 00369 debug().debug("Drop::%x::%x::", radio().id(), from); 00370 #endif 00371 } else if (nb_t::NB_READY == event) { 00372 // when neighborhood is ready start clustering 00373 enable(); 00374 uint8_t buf[beacon_size()]; 00375 get_beacon(buf); 00376 if (neighbor_discovery_->set_payload((uint8_t) CLUSTERING, buf, 00377 beacon_size()) != 0) { 00378 #ifdef DEBUG 00379 debug_->debug("Error::%x::", radio_->id()); 00380 #endif 00381 } 00382 } 00383 } 00384 00385 /* 00386 * Size of the payload to the ND module beacon 00387 */ 00388 size_t beacon_size() { 00389 JoinClusterMsg<OsModel, Radio> msg; 00390 //send a new join message using the beacon 00391 return msg.length(); 00392 } 00393 00394 /* 00395 * Receive a beacon payload 00396 * check for new head if needed 00397 * check if in need to reform 00398 */ 00399 void receive_beacon(node_id_t node_from, size_t len, uint8_t * data) { 00400 //receive the beacon data 00401 JoinClusterMsg<OsModel, Radio> msg; 00402 memcpy(&msg, data, len); 00403 node_id_t cluster = msg.cluster_id(); 00404 int hops = msg.hops(); 00405 00406 //if the connection to the cluster head was lost 00407 if (head_lost_) { 00408 00409 //if the beacon came from a cluster head 00410 if (node_from == cluster) { 00411 00412 // join him 00413 // inform iterator about the new cluster 00414 it().set_parent(node_from); 00415 it().set_cluster_id(cluster); 00416 jd().set_cluster_id(cluster); 00417 it().set_hops(hops); 00418 jd().set_hops(hops); 00419 it().set_node_type(SIMPLE); 00420 it().node_joined(node_from); 00421 00422 // if joined , node state changed 00423 this->state_changed(NODE_JOINED); 00424 00425 //mark that the head_lost_ situation was resolved 00426 head_lost_ = false; 00427 timer_expired(0); 00428 } 00429 } 00430 00431 //SET the node lists accordingly 00432 if (cluster == radio().id()) { 00433 it().node_joined(node_from); 00434 } else { 00435 it().node_not_joined(node_from); 00436 } 00437 00438 //if message was sent from a cluster head 00439 if (node_from == cluster) { 00440 00441 //debug().debug("Got A Beacon node :%x from :%x status:%x\n",radio().id(),node_from,status_); 00442 00443 /* // if the messages says reform and it was sent by my 00444 // cluster head , and i am not already reforming 00445 if ((reform) && (status() == 0)) { 00446 //if ((reform)&&(node_from == cluster_id())&&(!status_)){ 00447 status_ = 1; 00448 #ifdef DEBUG 00449 debug().debug("Reform::%x", radio().id()); 00450 #endif 00451 //timer().template set_timer<self_type, &self_type::form_cluster > (time_slice_*0.9, this, (void *) maxhops_ ); 00452 form_cluster((void*) maxhops_); 00453 00454 } else*/ 00455 if (is_cluster_head()) { 00456 if (it().node_count(1) == 0) { 00457 if (cluster < cluster_id()) { 00458 debug().debug("Orphan::%x", radio().id()); 00459 // join him 00460 // inform iterator about the new cluster 00461 it().set_parent(node_from); 00462 it().set_cluster_id(cluster); 00463 jd().set_cluster_id(cluster); 00464 it().set_hops(hops); 00465 jd().set_hops(hops); 00466 it().set_node_type(SIMPLE); 00467 it().node_joined(node_from); 00468 00469 // if joined , node state changed 00470 this->state_changed(NODE_JOINED); 00471 00472 //create the resyme message 00473 ResumeClusterMsg<OsModel, Radio> msg = 00474 it().get_resume_payload(); 00475 //do send the message 00476 radio().send(it().parent(), msg.length(), 00477 (block_data_t *) &msg); 00478 #ifdef DEBUG 00479 debug().debug("Send::%x::%d::%x::%d::", radio().id(), 00480 msg.msg_id(), it().parent(), msg.length()); 00481 #endif 00482 } 00483 } 00484 } 00485 } else { 00486 //if the sender was my cluster head and is no more a CH 00487 if (node_from == cluster_id()) { 00488 node_lost(cluster_id()); 00489 } 00490 } 00491 } 00492 00493 /* 00494 * Get a payload 00495 * to save on a beacon message 00496 */ 00497 void get_beacon(uint8_t * mess) { 00498 JoinClusterMsg<OsModel, Radio> msg = jd().get_join_request_payload(); 00499 memcpy(mess, &msg, msg.length()); 00500 } 00501 00502 /* 00503 * RECEIVE 00504 * respond to the new messages received 00505 * callback from the radio 00506 * */ 00507 void receive(node_id_t from, size_t len, block_data_t *data) { 00508 00509 // drop own messages 00510 if (radio().id() == from) 00511 return; 00512 if (!neighbor_discovery_->is_neighbor_bidi(from)) 00513 return; 00514 00515 // get Type of Message 00516 uint8_t type = *data; 00517 00518 // type=JOIN 00519 if (type == JOIN) { 00520 #ifdef RECEIVE_DEBUG 00521 debug().debug("RECEIVED JOIN Node %x <- %x\n", radio().id(), from); 00522 #endif 00523 if (node_type() == HEAD) 00524 return; 00525 // try to join 00526 if (jd().join(data, len)) { 00527 // set values for iterator and join_decision 00528 it().set_parent(from); 00529 it().set_cluster_id(jd().cluster_id()); 00530 it().set_hops(jd().hops()); 00531 it().set_node_type(SIMPLE); 00532 it().node_joined(from); 00533 this->state_changed(NODE_JOINED); 00534 } 00535 } else if (type == RESUME) { 00536 #ifdef RECEIVE_DEBUG 00537 debug().debug("RECEIVED RESUME Node %x <- %x\n", radio().id(), from); 00538 #endif 00539 } 00540 } 00541 00542 /* 00543 * Called when ND lost contact with a node 00544 * If the node was cluster head 00545 * - start searching for new head 00546 * else 00547 * - remove node from known nodes 00548 */ 00549 inline void node_lost(node_id_t node) { 00550 //If the node was my CH 00551 if (node == cluster_id()) { 00552 //Reset Iterator 00553 it().reset(); 00554 //Mark as headless 00555 head_lost_ = true; 00556 //Timeout for new CH beacons 00557 timer().template set_timer<self_type, &self_type::wait_for_joins> ( 00558 maxhops_ * time_slice_, this, 0); 00559 } else { 00560 //if not my CH 00561 //Remove from Iterator 00562 it().drop_node(node); 00563 } 00564 } 00565 00566 private: 00567 int receive_callback_id_; // receive message callback 00568 int probability_; // clustering parameter 00569 int maxhops_; 00570 nb_t * neighbor_discovery_; 00571 bool enabled_; 00572 uint8_t status_; // the status of the clustering algorithm 00573 static const uint32_t time_slice_ = 2000; // time to wait for cluster accept replies 00574 int round_; 00575 int auto_reform_; //time to autoreform the clusters 00576 bool reform_; // flag to start reforming 00577 bool head_lost_; // flag when the head was lost 00578 00579 /* CLustering algorithm modules */ 00580 HeadDecision_t * chd_; 00581 JoinDecision_t * jd_; 00582 Iterator_t * it_; 00583 00584 Iterator_t& it() { 00585 return *it_; 00586 } 00587 00588 JoinDecision_t& jd() { 00589 return *jd_; 00590 } 00591 00592 HeadDecision_t& chd() { 00593 return *chd_; 00594 } 00595 00596 Radio * radio_; // radio module 00597 Timer * timer_; // timer module 00598 Debug * debug_; // debug module 00599 Rand * rand_; 00600 00601 Radio& radio() { 00602 return *radio_; 00603 } 00604 00605 Timer& timer() { 00606 return *timer_; 00607 } 00608 00609 Debug& debug() { 00610 return *debug_; 00611 } 00612 00613 Rand& rand() { 00614 return *rand_; 00615 } 00616 }; 00617 } 00618 #endif