Wiselib
|
00001 #ifndef __MAXMIND_CLUSTER_FORMATION_H_ 00002 #define __MAXMIND_CLUSTER_FORMATION_H_ 00003 00004 #include "util/delegates/delegate.hpp" 00005 #include "algorithms/cluster/clustering_types.h" 00006 #include "util/base_classes/clustering_base.h" 00007 00008 #undef DEBUG 00009 // Uncomment to enable Debug 00010 #define DEBUG 00011 00012 namespace wiselib { 00020 template<typename OsModel_P, 00021 typename HeadDecision_P, 00022 typename JoinDecision_P, 00023 typename Iterator_P> 00024 class MaxmindCore 00025 : public ClusteringBase <OsModel_P> { 00026 public: 00027 00028 //TYPEDEFS 00029 typedef OsModel_P OsModel; 00030 typedef HeadDecision_P HeadDecision_t; 00031 typedef JoinDecision_P JoinDecision_t; 00032 typedef Iterator_P Iterator_t; 00033 typedef typename OsModel::Radio Radio; 00034 typedef typename OsModel::Timer Timer; 00035 typedef typename OsModel::Debug Debug; 00036 typedef MaxmindCore<OsModel_P, HeadDecision_P, JoinDecision_P, Iterator_P> self_t; 00037 00038 //DATA TYPES 00039 typedef int cluster_id_t; 00040 typedef int cluster_level_t; //quite useless within current scheme, supported for compatibility issues 00041 typedef typename Radio::node_id_t node_id_t; 00042 typedef typename Radio::size_t size_t; 00043 typedef typename Radio::block_data_t block_data_t; 00044 00045 //delegates 00046 typedef delegate1<void, int> cluster_delegate_t; 00047 00048 /* 00049 * Constructor 00050 * */ 00051 MaxmindCore() : 00052 maxhops_(5), 00053 round_(0) { 00054 } 00055 ; 00056 00057 /* 00058 * Destructor 00059 * */ 00060 ~MaxmindCore() { 00061 } 00062 ; 00064 00065 /* 00066 * INIT 00067 * initializes the values of radio timer and debug 00068 */ 00069 void init(Radio& radio, Timer& timer, Debug& debug) { 00070 radio_ = &radio; 00071 timer_ = &timer; 00072 debug_ = &debug; 00073 }; 00074 00075 bool is_cluster_head(){ 00076 return chd().is_cluster_head(); 00077 } 00078 00079 /* SET functions */ 00080 00081 // Set the iterator Module 00082 00083 void set_iterator(Iterator_t &it) { 00084 it_ = ⁢ 00085 } 00086 00087 // Set the join_decision Module 00088 00089 void set_join_decision(JoinDecision_t &jd) { 00090 jd_ = &jd; 00091 } 00092 00093 // Set the cluster_head_decision Module 00094 00095 void set_cluster_head_decision(HeadDecision_t &chd) { 00096 chd_ = &chd; 00097 } 00098 00099 // Set the theta value 00100 00101 void set_maxhops(int maxhops) { 00102 maxhops_ = maxhops; 00103 } 00104 00105 /* GET functions */ 00106 00107 // Get the Node Parent 00108 00109 node_id_t parent(void) { 00110 return it().parent(); 00111 } 00112 00113 // Get the cluster_id 00114 00115 cluster_id_t cluster_id(void) { 00116 return it().cluster_id(); 00117 } 00118 00119 // Get the node_type 00120 00121 cluster_id_t node_type(void) { 00122 return it().node_type(); 00123 } 00124 00125 int hops(){ 00126 return it().hops(); 00127 } 00128 00129 //MAXMIND ONLY CALLBACKS 00130 00131 /* 00132 * callback from the cluster_head_decision_ 00133 * When a head is to be decided, chd_ 00134 * needs the winner vector from jd_ 00135 * 00136 * argument: pointer to the winner array 00137 */ 00138 void winner(node_id_t * mem_pos) { 00139 // get the winner from join_decision_ 00140 jd().get_winner(mem_pos); 00141 00142 } 00143 00144 /* callback from the cluster_head_decision_ 00145 * When a head is to be decided, chd_ 00146 * needs the sender vector from jd_ 00147 * 00148 * argument: pointer to the sender array 00149 */ 00150 void sender(node_id_t * mem_pos) { 00151 // get the sender from join_decision_ 00152 jd().get_sender(mem_pos); 00153 00154 } 00155 00156 00157 /* SHOW all the known nodes */ 00158 void present() { 00159 it().present_neibhors(); 00160 00161 } 00162 00163 /* 00164 * Enable 00165 * enables the bfsclustering module 00166 * initializes values 00167 * registers callbacks 00168 * */ 00169 void enable(void); 00170 /* 00171 * Disable 00172 * disables the bfsclustering module 00173 * unregisters callbacks 00174 * */ 00175 void disable(void); 00176 00177 void timer_expired(void * data); 00178 00179 /* 00180 * Find_head 00181 * starts clustering procedure 00182 * */ 00183 void find_head(); 00184 00185 /* 00186 * Convergecast 00187 * Start a procedure where 00188 * outer nodes inform inner nodes for their status 00189 * */ 00190 void convergecast(void * data); 00191 //useless now 00192 void receive_next(int num); 00193 00194 00195 #ifdef DEBUG 00196 00197 int mess_flood() { 00198 return mess_flood_; 00199 } 00200 00201 int mess_inform() { 00202 return mess_inform_; 00203 00204 } 00205 00206 int mess_convergecast() { 00207 return mess_convergecast_; 00208 } 00209 00210 int mess_rejoin() { 00211 return mess_rejoin_; 00212 } 00213 #endif 00214 00215 00216 protected: 00217 /* 00218 * RECEIVE 00219 * respond to the new messages received 00220 * callback from the radio 00221 * */ 00222 void receive(node_id_t receiver, size_t len, block_data_t *data); 00223 00224 private: 00225 00226 00227 int callback_id_; //receive callbalck 00228 int next_callback_id_; // 00229 int winner_callback_id_; // get the winner list callback chd_<>jd_ 00230 int sender_callback_id_; // get the sender list callback chd_<>jd_ 00231 int maxhops_; // clustering parameter 00232 int round_; // the "synchronous" round of the algorithm 00233 00234 #ifdef DEBUG 00235 int mess_flood_; 00236 int mess_inform_; 00237 int mess_convergecast_; 00238 int mess_rejoin_; 00239 00240 void inc_mess_flood() { 00241 mess_flood_++; 00242 } 00243 00244 void inc_mess_inform() { 00245 mess_inform_++; 00246 00247 } 00248 00249 void inc_mess_convergecast() { 00250 mess_convergecast_++; 00251 } 00252 00253 void inc_mess_rejoin() { 00254 mess_rejoin_++; 00255 } 00256 00257 void reset_mess_counters() { 00258 mess_flood_ = 0; 00259 mess_inform_ = 0; 00260 mess_convergecast_ = 0; 00261 mess_rejoin_ = 0; 00262 } 00263 #endif 00264 00265 00266 HeadDecision_t *chd_; // cluster_head_decision_ module 00267 JoinDecision_t *jd_; // join_decision_ module 00268 Iterator_t *it_; // iterator_ module 00269 00270 HeadDecision_t& chd() { 00271 return *chd_; 00272 } 00273 00274 JoinDecision_t& jd() { 00275 return *jd_; 00276 } 00277 00278 Iterator_t& it() { 00279 return *it_; 00280 } 00281 00282 00283 static const int time_slice_ = 1000; // timeslice for checking once per 1500 msec (simulates an once per round) 00284 00285 Radio * radio_; // radio module 00286 Timer * timer_; // timer module 00287 Debug * debug_; // debug module 00288 00289 Radio& radio() { 00290 return *radio_; 00291 } 00292 00293 Timer& timer() { 00294 return *timer_; 00295 } 00296 00297 Debug& debug() { 00298 return *debug_; 00299 } 00300 00301 }; 00302 00303 template<typename OsModel_P, 00304 typename HeadDecision_P, 00305 typename JoinDecision_P, 00306 typename Iterator_P> 00307 void MaxmindCore<OsModel_P, HeadDecision_P, JoinDecision_P, 00308 Iterator_P>::enable(void) { 00309 // initialize 00310 chd().init(radio(), debug()); 00311 jd().init(radio(), debug()); 00312 it().init(radio(), timer(), debug()); 00313 #ifdef DEBUG 00314 //reset values 00315 reset_mess_counters(); 00316 #endif 00317 round_ = 0; 00318 it().enable(); 00319 chd().enable(); 00320 jd().enable(); 00321 00322 // Enable the Radio 00323 00324 radio().enable_radio(); 00325 00326 00327 00328 00329 00330 00331 00332 // Register receive callback 00333 // to enable receiving new messages 00334 callback_id_ 00335 = radio().template reg_recv_callback<self_t, &self_t::receive > ( 00336 this); 00337 // Set os pointer for iterator 00338 00339 00340 // register next callback 00341 next_callback_id_ = it().template reg_next_callback<self_t, 00342 &self_t::receive_next > (this); 00343 00344 //MAXMIND 00345 //register the callback from the cluster_head_decision_ to the iterator 00346 winner_callback_id_ = chd().template reg_winner_callback<self_t, 00347 &self_t::winner > (this); 00348 sender_callback_id_ = chd().template reg_sender_callback<self_t, 00349 &self_t::sender > (this); 00350 00351 //MAXMIND 00352 chd().set_theta(maxhops_); 00353 jd().set_id(radio().id()); 00354 chd().set_id(radio().id()); 00355 it().set_id(radio().id()); 00356 jd().set_theta(maxhops_); 00357 00358 00359 /* 00360 * All needed structures and modules 00361 * are now initialized and we can start 00362 * clustering 00363 * 00364 * Call find_head() to find the new clusterhead 00365 * 00366 * */ 00367 find_head(); 00368 } 00369 00370 template<typename OsModel_P, 00371 typename HeadDecision_P, 00372 typename JoinDecision_P, 00373 typename Iterator_P> 00374 void MaxmindCore<OsModel_P, HeadDecision_P, JoinDecision_P, Iterator_P>::disable(void) { 00375 // Unregister the callbacks 00376 radio().unreg_recv_callback(callback_id_); 00377 it().unreg_next_callback(next_callback_id_); 00378 chd().unreg_winner_callback(winner_callback_id_); 00379 chd().unreg_sender_callback(sender_callback_id_); 00380 // Disable the Radio 00381 //radio().disable(); 00382 00383 } 00384 00385 template<typename OsModel_P, 00386 typename HeadDecision_P, 00387 typename JoinDecision_P, 00388 typename Iterator_P> 00389 void MaxmindCore<OsModel_P, HeadDecision_P, JoinDecision_P, 00390 Iterator_P>::receive(node_id_t from, size_t len, block_data_t* data) { 00391 // if message is from myself Ignore it 00392 if (radio().id() == from) return; 00393 00394 // data[0] shows the type of the message 00395 int type = data[0]; 00396 00397 // Check all supported Message Types and act differently 00398 if (type == FLOOD) { 00399 #ifdef DEBUG 00400 debug().debug("RECEIVED FLOOD %x <- %x\n", radio().id(), from); 00401 #endif 00402 /* 00403 * If a flooding message then pass it to jd_ 00404 * First or Second Stage of clustering so 00405 * it is needed to build the sender and winner lists 00406 * 00407 * */ 00408 jd().join(data, len); 00409 } 00410 if (type == INFORM) { 00411 00412 #ifdef DEBUG 00413 debug().debug("RECEIVED INFORM %x <- %x\n", radio().id(), from); 00414 #endif 00415 /* 00416 * If an inform message then pas it to it_ 00417 * 4th stage of clustering 00418 * get to know the neiborhod 00419 * */ 00420 it().inform(data, len); 00421 } 00422 if (type == CONVERGECAST) { 00423 00424 /* 00425 * If a convergecast message check if the 00426 * destination else ignore 00427 * 00428 * cluster_heads End Convergecast Messages 00429 * simple_nodes Forward Convergecast Messages 00430 * gateway_nodes Start Convergecast Messages 00431 * 00432 * */ 00433 00434 #ifdef DEBUG 00435 debug().debug("RECEIVED CONVERGECAST %x <- %x\n", radio().id(), from); 00436 #endif 00437 00438 // if cluster head finish the convergecast 00439 if (is_cluster_head()) { 00440 #ifdef DEBUG 00441 debug().debug("Node_type= HEAD\n"); 00442 #endif 00443 cluster_id_t child_cluster; 00444 memcpy(&child_cluster,data+1+2*sizeof(node_id_t),sizeof(cluster_id_t)); 00445 // Check if Child Node needs to correct its cluster_id 00446 if (cluster_id() != child_cluster) { 00447 #ifdef DEBUG 00448 debug().debug( 00449 "status= WRONG message_cluster= %x my_cluster= %x\n", 00450 child_cluster, 00451 cluster_id() 00452 ); 00453 #endif 00454 // RULE 4 of CLustering 00455 /* 00456 * To correct a nodes cluster_head 00457 * send a REJOIN message to the node in question 00458 * */ 00459 // rejoin messages have size 6 00460 size_t mess_size = it().get_payload_length(REJOIN); 00461 // create the rejoin message 00462 block_data_t m_sid[mess_size]; 00463 node_id_t child_id; 00464 memcpy(&child_id,data+1+sizeof(node_id_t),sizeof(node_id_t)); 00465 #ifdef DEBUG 00466 debug().debug("SEND REJOIN %x -> %x ", 00467 radio().id(), 00468 from 00469 ); 00470 inc_mess_rejoin(); 00471 #endif 00472 it().get_rejoin_payload(m_sid, child_id); 00473 00474 // do send the message 00475 radio().send(from, mess_size, 00476 m_sid); 00477 00478 00479 it().remove_from_non_cluster(child_id); 00480 it().add_to_cluster(child_id); 00481 }// Same cluster , just inform my structs 00482 else { 00483 #ifdef DEBUG 00484 debug().debug("status= CORRECT\n"); 00485 #endif 00486 } 00487 00488 it().eat_convergecast(data, len); 00489 00490 00491 00492 }// if the node is a simple node forward the message to the cluster_head 00493 else { 00494 00495 //Get the data from the message 00496 it().eat_convergecast(data, len); 00497 00498 #ifdef DEBUG 00499 debug().debug("Node_type= SIMPLE\n"); 00500 #endif 00501 00502 00503 radio().send(it().parent(), len, data); 00504 00505 #ifdef DEBUG 00506 debug().debug("SEND CONVERGECAST %x -> %x \n", 00507 radio().id(), 00508 it().parent() 00509 ); 00510 inc_mess_convergecast(); 00511 #endif 00512 00513 00514 00515 } 00516 00517 00518 } 00519 00520 /* 00521 * If a REJOIN message check for 00522 * cluster id problems else forward or ignore 00523 * 00524 * nodes forward and check rejoin messages that were 00525 * sent only from their selected parents 00526 * */ 00527 00528 if (type == REJOIN) { 00529 00530 // local message copy 00531 block_data_t payload[len]; 00532 memcpy(payload, data, len); 00533 #ifdef DEBUG 00534 debug().debug("RECEIVED REJOIN %x <- %x\n", radio().id(), from); 00535 #endif 00536 // get message destination node 00537 node_id_t destination_node ; 00538 memcpy(&destination_node, payload+1,sizeof(node_id_t)); 00539 // get message ttl 00540 size_t ttl; 00541 memcpy(&ttl,payload+1+sizeof(node_id_t)+sizeof(cluster_id_t),sizeof(size_t)); 00542 00543 /* 00544 * if the destination node 00545 * check and correct the cluster head 00546 * stop the message from moving forward 00547 * */ 00548 if (destination_node == radio().id()) { 00549 00550 00551 00552 // get the new cluster_id from the message 00553 cluster_id_t mess_cluster_id ; 00554 memcpy(&mess_cluster_id,payload+1+sizeof(node_id_t),sizeof(cluster_id_t)); 00555 00556 // change my cluster_id if there is a difference 00557 if (mess_cluster_id!= cluster_id()) { 00558 #ifdef DEBUG 00559 debug().debug("action= REJOIN old= %x new= %x\n", cluster_id(), mess_cluster_id); 00560 #endif 00561 // do change the cluster id 00562 it().set_cluster_id(mess_cluster_id); 00563 00564 this->state_changed(CLUSTER_HEAD_CHANGED); // callback to wiselib.processor 00565 } 00566 }/* 00567 * if not the destination node 00568 * check to see if it was sent by my parent 00569 * if it was use it to check my cluster_id 00570 * and forward to my children 00571 * */ 00572 else { 00573 00574 00575 if (!is_cluster_head()) { 00576 // check the sender of the message 00577 if (it().parent() == from) { 00578 // get the new cluster_id from the message 00579 cluster_id_t mess_cluster_id; 00580 memcpy(&mess_cluster_id, payload + 1 + sizeof (node_id_t), sizeof (cluster_id_t)); 00581 // change my cluster_id if there is a difference 00582 if (mess_cluster_id != cluster_id()) { 00583 // do change the cluster id 00584 it().set_cluster_id(mess_cluster_id); 00585 00586 this->state_changed(CLUSTER_HEAD_CHANGED); // callback to wiselib.processor 00587 } 00588 00589 /* 00590 * if the message ttl has not 00591 * expired then forward the message 00592 * */ 00593 if (--ttl >= 0) { 00594 // get the destination node from the original message 00595 00596 // set the new ttl value 00597 memcpy(payload + 1 + sizeof (node_id_t) + sizeof (cluster_id_t), &ttl, sizeof (size_t)); 00598 #ifdef DEBUG 00599 debug().debug("action= FORWARD\n"); 00600 debug().debug("SEND REJOIN %x -> %x [%x |%x |%d ]\n", 00601 radio().id(), 00602 Radio::BROADCAST_ADDRESS, 00603 destination_node, 00604 mess_cluster_id, 00605 ttl 00606 ); 00607 00608 inc_mess_rejoin(); 00609 #endif 00610 00611 // do forward the message 00612 radio().send(Radio::BROADCAST_ADDRESS, len, payload); 00613 } else { 00614 } 00615 } else { 00616 } 00617 } 00618 } 00619 00620 } 00621 00622 00623 } 00624 00625 template<typename OsModel_P, 00626 typename HeadDecision_P, 00627 typename JoinDecision_P, 00628 typename Iterator_P> 00629 void MaxmindCore<OsModel_P, HeadDecision_P, JoinDecision_P, 00630 Iterator_P>::find_head() { 00631 #ifdef DEBUG 00632 if (round_ == 0) { 00633 debug().debug("1st stage - flooding\n"); 00634 } 00635 #endif 00636 if (round_ < 2 * maxhops_) { 00637 00638 00639 // get the message size 00640 size_t mess_size = jd().get_payload_length(FLOOD); 00641 // check if a flood_payload is supported 00642 block_data_t m_sid[mess_size]; 00643 00644 #ifdef DEBUG 00645 debug().debug("SEND FLOOD %x -> %x ", 00646 radio().id(), 00647 Radio::BROADCAST_ADDRESS 00648 ); 00649 inc_mess_flood(); 00650 #endif 00651 // get the payload 00652 jd().get_flood_payload(m_sid); 00653 00654 // send the FLOOD message 00655 radio().send(Radio::BROADCAST_ADDRESS, mess_size, m_sid); 00656 00657 00658 00659 00660 round_++; //move to next round 00661 00662 // Reset the timer 00663 timer().template set_timer<self_t, &self_t::timer_expired > ( 00664 time_slice_, this, (void*) 0); 00665 } else { 00666 #ifdef DEBUG 00667 debug().debug("stage 2 - calculate head\n"); 00668 #endif 00669 00670 if (chd().calculate_head()) { 00671 00672 // i am cluster head 00673 cluster_id_t cluster_id = chd().cluster_id(); 00674 node_id_t parent = chd().parent(); 00675 it().set_cluster_id(cluster_id); 00676 it().set_node_type(HEAD); // set my node_type as head 00677 it().set_parent(parent); // set myself as my parent (USED for visualization) 00678 it().set_hops(0); // set hop distance from head 00679 00680 00681 this->state_changed(CLUSTER_HEAD_CHANGED); 00682 00683 } else { 00684 // i am not cluster_head 00685 00686 cluster_id_t cluster_id = chd().cluster_id(); 00687 node_id_t parent = chd().parent(); 00688 it().set_node_type(UNCLUSTERED); // set my node type as simple node 00689 it().set_parent(parent); // set my parent 00690 it().set_cluster_id(cluster_id); 00691 00692 00693 } 00694 00695 // notify neibhors for my cluster_id 00696 #ifdef DEBUG 00697 debug().debug(" head= DECIDED\n"); 00698 debug().debug("stage 3 - send inform\n"); 00699 #endif 00700 00701 00702 // get the message size 00703 int mess_size = it().get_payload_length(INFORM); 00704 // check if the inform_payload exists 00705 00706 // get the payload 00707 block_data_t m_sid[mess_size]; 00708 #ifdef DEBUG 00709 debug().debug("SEND INFORM %x -> %x ", 00710 radio().id(), 00711 Radio::BROADCAST_ADDRESS 00712 ); 00713 00714 inc_mess_inform(); 00715 #endif 00716 it().get_inform_payload(m_sid); 00717 00718 // send the payload 00719 radio().send(Radio::BROADCAST_ADDRESS, mess_size, m_sid); 00720 00721 round_++; // move to next round 00722 00723 00724 //set timer and after expire if gateway start convergecast 00725 timer().template set_timer<self_t, &self_t::convergecast > ( 00726 time_slice_, this, (void*) 0); 00727 00728 00729 } 00730 00731 } 00732 00733 template<typename OsModel_P, 00734 typename HeadDecision_P, 00735 typename JoinDecision_P, 00736 typename Iterator_P> 00737 void MaxmindCore<OsModel_P, HeadDecision_P, JoinDecision_P, 00738 Iterator_P>::receive_next(int num) { 00739 } 00740 00741 template<typename OsModel_P, 00742 typename HeadDecision_P, 00743 typename JoinDecision_P, 00744 typename Iterator_P> 00745 void MaxmindCore<OsModel_P, HeadDecision_P, JoinDecision_P, 00746 Iterator_P>::timer_expired(void * data) { 00747 // if timer expired check to find head 00748 find_head(); 00749 00750 } 00751 00752 // timer expired2 00753 00754 template<typename OsModel_P, 00755 typename HeadDecision_P, 00756 typename JoinDecision_P, 00757 typename Iterator_P> 00758 void MaxmindCore<OsModel_P, HeadDecision_P, JoinDecision_P, 00759 Iterator_P>::convergecast(void * data) { 00760 #ifdef DEBUG 00761 debug().debug("stage 4 - convergecast\n"); 00762 #endif 00763 00764 00765 /* 00766 * After the inform stage all gateway nodes 00767 * start reporting to their cluster heads 00768 * 00769 * Only gateway nodes report their status 00770 * 00771 * */ 00772 // check i a gateway node 00773 if (node_type() == GATEWAY) { 00774 #ifdef DEBUG 00775 debug().debug("Convergecast %x\n", radio().id()); 00776 #endif 00777 // create the convergecast message 00778 int mess_size = it().get_payload_length(CONVERGECAST); 00779 00780 block_data_t m_sid[mess_size]; 00781 #ifdef DEBUG 00782 00783 debug().debug("SEND CONVERGECAST %x -> %x ", 00784 radio().id(), 00785 it().parent() 00786 ); 00787 inc_mess_convergecast(); 00788 #endif 00789 it().get_convergecast_payload(m_sid); 00790 // do send the convergecast messages 00791 radio().send(it().parent(), mess_size, m_sid); 00792 00793 } 00794 #ifdef DEBUG 00795 debug().debug("waiting..."); 00796 #endif 00797 00798 00799 } 00800 00801 00802 } 00803 00804 #endif