Wiselib
|
00001 #ifndef _BFSCLUSTERING_H 00002 #define _BFSCLUSTERING_H 00003 00004 #include "util/delegates/delegate.hpp" 00005 #include "util/pstl/vector_static.h" 00006 #include "algorithms/cluster/clustering_types.h" 00007 00008 #undef DEBUG 00009 #define DEBUG 00010 00011 namespace wiselib { 00012 00020 template<typename OsModel_P, 00021 typename Radio_P, typename Timer_P, typename Debug_P> 00022 class BfsClustering { 00023 public: 00024 00025 //TYPEDEFS 00026 typedef OsModel_P OsModel; 00027 typedef Radio_P Radio; 00028 typedef Timer_P Timer; 00029 typedef Debug_P Debug; 00030 typedef BfsClustering<OsModel_P, Radio_P, Timer_P, Debug_P> self_t; 00031 typedef self_t* self_pointer_t; 00032 00033 00034 //data types 00035 typedef int cluster_id_t; 00036 typedef int cluster_level_t; //quite useless within current scheme, supported for compatibility issues 00037 typedef typename Radio::node_id_t node_id_t; 00038 typedef typename Radio::size_t size_t; 00039 typedef typename Radio::block_data_t block_data_t; 00040 00041 typedef delegate1<void, int> cluster_delegate_t; 00042 00043 00044 typedef wiselib::vector_static<OsModel, node_id_t, 100 > vector_t; 00045 00046 /* set functions */ 00047 00048 // set the clustering parameter 00049 00050 void set_theta(int theta) { 00051 theta_ = theta; 00052 } 00053 00054 /* get functions */ 00055 00056 //get the cluster level (useless currently) 00057 00058 cluster_level_t cluster_level() { 00059 return 0; 00060 } 00061 00062 //get the cluster head status 00063 00064 bool cluster_head() { 00065 return cluster_head_; 00066 } 00067 00068 //get the parent value 00069 00070 node_id_t parent() { 00071 return parent_; 00072 } 00073 00074 //get the hops from head 00075 00076 size_t hops() { 00077 return hops_; 00078 } 00079 00080 //get the cluster id 00081 00082 cluster_id_t cluster_id() { 00083 return SID_; 00084 } 00085 00086 /* SHOW all the known nodes */ 00087 #ifdef DEBUG 00088 void present_neighbors() { 00089 #ifndef ISENSE_APP 00090 debug().debug("Present Node %d Neighbors:\n", radio().id()); 00091 debug().debug("Cluster: "); 00092 for (size_t i = 0; i < cluster_neighbhors_.size(); i++) { 00093 debug().debug("%d ", cluster_neighbhors_.at(i)); 00094 } 00095 debug().debug("\nNon-Cluster: "); 00096 for (size_t i = 0; i < non_cluster_neighbhors_.size(); i++) { 00097 debug().debug("%d ", non_cluster_neighbhors_.at(i)); 00098 } 00099 debug().debug("\n"); 00100 #else 00101 debug().debug("Present Node %x Neighbors:", radio().id()); 00102 debug().debug("Cluster: "); 00103 for (size_t i = 0; i < cluster_neighbhors_.size(); i++) { 00104 debug().debug("%x ", cluster_neighbhors_.at(i)); 00105 } 00106 debug().debug("Non-Cluster: "); 00107 for (size_t i = 0; i < non_cluster_neighbhors_.size(); i++) { 00108 debug().debug("%x ", non_cluster_neighbhors_.at(i)); 00109 } 00110 00111 #endif 00112 } 00113 #endif 00114 00115 00116 00117 /* CALLBACKS */ 00118 00119 template<class T, void (T::*TMethod)(int) > 00120 inline int reg_changed_callback(T *obj_pnt) { 00121 state_changed_callback_ = cluster_delegate_t::from_method<T, TMethod > (obj_pnt); 00122 return state_changed_callback_; 00123 } 00124 00125 void unreg_changed_callback(int idx) { 00126 state_changed_callback_ = cluster_delegate_t(); 00127 } 00128 00129 #ifdef DEBUG 00130 int mess_join() { 00131 return mess_join_; 00132 } 00133 00134 int mess_join_acc() { 00135 return mess_join_acc_; 00136 } 00137 00138 int mess_join_deny() { 00139 return mess_join_deny_; 00140 } 00141 00142 int mess_resume() { 00143 return mess_resume_; 00144 } 00145 #endif 00146 00147 /* 00148 * Enable 00149 * enables the bfsclustering module 00150 * initializes values 00151 * registers callbacks 00152 * and starts the 00153 * clustering procedure 00154 * */ 00155 void enable(void); 00156 00157 /* 00158 * Disable 00159 * disables the bfsclustering module 00160 * unregisters callbacks 00161 * */ 00162 void disable(void); 00163 00164 /* 00165 * TIMER_EXPIRED 00166 * if timer_expired is called and no 00167 * accept messages were received 00168 * there are no nodes under this one and 00169 * node has to respond to head with a resume message 00170 * */ 00171 void timer_expired(void *); 00172 00173 /* 00174 * RECEIVE 00175 * respond to the new messages received 00176 * callback from the radio 00177 * */ 00178 void receive(node_id_t receiver, size_t len, block_data_t *data); 00179 00180 00181 00182 /* 00183 * Constructor 00184 * */ 00185 BfsClustering() : 00186 cluster_head_(false), 00187 SID_(-1), 00188 parent_(-1), 00189 theta_(30), 00190 hops_(0), 00191 any_joined_(false) { 00192 cluster_neighbhors_.clear(); 00193 non_cluster_neighbhors_.clear(); 00194 } 00195 00196 /* 00197 * Destructor 00198 * */ 00199 ~BfsClustering() { 00200 } 00201 00202 /* 00203 * INIT 00204 * initializes the values of radio timer and debug 00205 */ 00206 void init(Radio& radio, Timer& timer, Debug& debug) { 00207 radio_ = &radio; 00208 timer_ = &timer; 00209 debug_ = &debug; 00210 }; 00211 00212 private: 00213 00214 bool cluster_head_; // if a cluster head 00215 cluster_id_t SID_; // the cluster id 00216 int callback_id_; // receive message callback 00217 cluster_delegate_t state_changed_callback_; // state changed callback to processor 00218 node_id_t parent_; // node's parent in cluster 00219 vector_t cluster_neighbhors_; // inside cluster neighbors 00220 vector_t non_cluster_neighbhors_; // outside cluster neighbors 00221 int theta_; // clustering parameter 00222 size_t hops_; // hops from cluster head 00223 bool any_joined_; // true if anyone joined under the node 00224 00225 #ifdef DEBUG 00226 // message sent counters 00227 int mess_join_; 00228 int mess_join_acc_; 00229 int mess_join_deny_; 00230 int mess_resume_; 00231 00232 void reset_mess_counters() { 00233 mess_join_ = 0; 00234 mess_join_acc_ = 0; 00235 mess_join_deny_ = 0; 00236 mess_resume_ = 0; 00237 } 00238 #endif 00239 00240 static const int time_slice_ = 2000; // time to receive accept messages 00241 00242 Radio * radio_; // radio pointer 00243 Timer * timer_; // timer pointer 00244 Debug * debug_; // debug pointer 00245 00246 Radio& radio() { 00247 return *radio_; 00248 } 00249 00250 Timer& timer() { 00251 return *timer_; 00252 } 00253 00254 Debug& debug() { 00255 return *debug_; 00256 } 00257 00258 00259 /* CONTROL the cluster and non cluster vectors */ 00260 // a node joins as a cluster neighbors 00261 void node_joined(node_id_t node) { 00262 for (size_t i = 0; i < cluster_neighbhors_.size(); i++) { 00263 if (node == cluster_neighbhors_.at(i)) { 00264 return; 00265 } 00266 } 00267 cluster_neighbhors_.push_back(node); 00268 00269 }; 00270 00271 // a node joins as a non cluster neighbors 00272 void node_not_joined(node_id_t node) { 00273 for (size_t i = 0; i < non_cluster_neighbhors_.size(); i++) { 00274 if (node == non_cluster_neighbhors_.at(i)) { 00275 return; 00276 } 00277 } 00278 non_cluster_neighbhors_.push_back(node); 00279 00280 }; 00281 }; 00282 00283 template<typename OsModel_P, 00284 typename Radio_P, typename Timer_P, typename Debug_P> 00285 void 00286 BfsClustering<OsModel_P, Radio_P, Timer_P, Debug_P>:: 00287 enable(void) { 00288 #ifdef DEBUG 00289 reset_mess_counters(); 00290 #endif 00291 // Initial values 00292 SID_ = -1; 00293 parent_ = -1; 00294 cluster_head_ = false; 00295 hops_ = 0; 00296 any_joined_ = false; 00297 00298 //enable radio 00299 00300 radio().enable_radio(); 00301 00302 // register receive callback to radio 00303 callback_id_ = radio().template reg_recv_callback<self_t, 00304 &self_t::receive > (this); 00305 00306 //-> Cluster Head Decision 00307 if (radio().id() % theta_ == 0) { 00308 00309 cluster_head_ = true; 00310 00311 } else { 00312 cluster_head_ = false; 00313 } 00314 00315 // if a cluster_head start the clustering 00316 if (cluster_head_) { 00317 00318 //Set cluster id to mine 00319 SID_ = radio().id(); 00320 //Set me as the parent node 00321 parent_ = radio().id(); 00322 //Callback to show cluster_head selection 00323 if (state_changed_callback_) state_changed_callback_(CLUSTER_HEAD_CHANGED); 00324 #ifdef ISENSE_APP 00325 debug().debug("START 1"); 00326 #endif 00327 //JOIN Message 00328 block_data_t m_sid[4] = {JOIN % 256, SID_ % 256, SID_ / 256, hops_ % 256}; 00329 //send the message 00330 radio().send(Radio::BROADCAST_ADDRESS, 4, m_sid); 00331 #ifdef ISENSE_APP 00332 debug().debug("STOP 1"); 00333 #endif 00334 #ifdef DEBUG 00335 //count the message 00336 mess_join_++; 00337 #ifndef ISENSE_APP 00338 debug().debug("SEND JOIN [%d , %d , %d]\n", m_sid[0], m_sid[1] + m_sid[2]*256, m_sid[3]); 00339 #else 00340 debug().debug("SEND JOIN [%d , %x , %d]", m_sid[0], m_sid[1] + m_sid[2]*256, m_sid[3]); 00341 #endif 00342 #endif 00343 00344 //set timer for expiry and clustering end 00345 timer().template set_timer<self_t, 00346 &self_t::timer_expired > (time_slice_, this, (void*) 0); 00347 00348 } 00349 00350 } 00351 00352 template<typename OsModel_P, 00353 typename Radio_P, typename Timer_P, typename Debug_P> 00354 void 00355 BfsClustering<OsModel_P, Radio_P, Timer_P, Debug_P>:: 00356 disable(void) { 00357 radio().unreg_recv_callback(callback_id_); 00358 } 00359 00360 template<typename OsModel_P, 00361 typename Radio_P, typename Timer_P, typename Debug_P> 00362 void 00363 BfsClustering<OsModel_P, Radio_P, Timer_P, Debug_P>:: 00364 receive(node_id_t from, size_t len, block_data_t* data) { 00365 00366 // drop all own messages 00367 if (from == radio().id()) return; 00368 00369 // copy the message to local memmory 00370 block_data_t recvm[len]; 00371 memcpy(recvm, data, len); 00372 00373 00374 if (recvm[0] == JOIN) { 00375 #ifndef ISENSE_APP 00376 debug().debug("RECEIVED JOIN Node %d <- %d \n", radio().id(), from); 00377 #else 00378 debug().debug("RECEIVED JOIN Node %x <- %x ", radio().id(), from); 00379 #endif 00380 if (cluster_head_) return; 00381 // if not inside a cluster yet 00382 if (SID_ == -1) { 00383 // join the cluster from the message 00384 SID_ = recvm[1] + 256 * recvm[2]; 00385 // set the parent from the message sender 00386 parent_ = from; 00387 // set the hops from the message 00388 hops_ = recvm[3] + 1; 00389 00390 //JOIN_ACCEPT message 00391 block_data_t mess[3] = {JOIN_ACCEPT , radio().id() % 256, radio().id() / 256}; 00392 //do send the message 00393 radio().send(from, 3, mess); 00394 00395 #ifdef DEBUG 00396 //increase the message counter 00397 mess_join_acc_++; 00398 #endif 00399 #ifndef ISENSE_APP 00400 debug().debug("SEND JOIN_ACCEPT Node %d -> %d \n", radio().id(), from); 00401 #else 00402 debug().debug("SEND JOIN_ACCEPT Node %x -> %x ", radio().id(), from); 00403 #endif 00404 00405 // notify for joining the cluster 00406 if (state_changed_callback_) state_changed_callback_(NODE_JOINED); 00407 00408 00409 // forward the message to the rest of the network 00410 block_data_t m_sid[4] = {JOIN , SID_ % 256, SID_ / 256, hops_ % 256}; 00411 //di send the message 00412 radio().send(Radio::BROADCAST_ADDRESS, 4, m_sid); 00413 00414 #ifdef DEBUG 00415 //increase the message counter 00416 mess_join_++; 00417 #endif 00418 #ifndef ISENSE_APP 00419 debug().debug("SEND JOIN [%d , %d , %d]\n", m_sid[0], m_sid[1] + m_sid[2]*256, m_sid[3]); 00420 #else 00421 debug().debug("SEND JOIN [%d , %x , %d]", m_sid[0], m_sid[1] + m_sid[2]*256, m_sid[3]); 00422 #endif 00423 00424 //set the timer to check for end of clustering 00425 timer().template set_timer<self_t, 00426 &self_t::timer_expired > (time_slice_, this, (void*) 0); 00427 00428 }// if already inside a cluster ignore message 00429 else { 00430 } 00431 00432 } else if (recvm[0] == JOIN_ACCEPT) { 00433 // if a join accept received 00434 00435 // get the node id from the message 00436 node_id_t node = recvm[1] + recvm[2]*256; 00437 // note that a child joined 00438 any_joined_ = true; 00439 00440 #ifndef ISENSE_APP 00441 debug().debug("RECEIVED JOIN_ACCEPT Node %d <- %d \n", radio().id(), node); 00442 #else 00443 debug().debug("RECEIVED JOIN_ACCEPT Node %x <- %x ", radio().id(), node); 00444 #endif 00445 // add node to the cluster vector 00446 node_joined(node); 00447 00448 if (!cluster_head_) { 00449 //if not a cluster head forward the message to head 00450 radio().send(parent(), len, recvm); 00451 #ifdef DEBUG 00452 //increase the message counter 00453 mess_join_acc_++; 00454 #endif 00455 #ifndef ISENSE_APP 00456 debug().debug("SEND JOIN_ACCEPT Node %d -> %d \n", radio().id(), parent()); 00457 #else 00458 debug().debug("SEND JOIN_ACCEPT Node %x -> %x ", radio().id(), parent()); 00459 #endif 00460 // notify fro the node that joined 00461 if (state_changed_callback_) state_changed_callback_(NODE_JOINED); 00462 } 00463 } else if (recvm[0] == RESUME) { 00464 // if a cluster head the clustering of the branch is finished 00465 if (cluster_head_) { 00466 00467 if (state_changed_callback_) state_changed_callback_(CLUSTER_FORMED); 00468 #ifndef ISENSE_APP 00469 debug().debug("CLUSTER %d FORMED\n", radio().id()); 00470 #else 00471 debug().debug("CLUSTER %x FORMED", radio().id()); 00472 #endif 00473 }// if not a cluster head forward the resume 00474 else { 00475 // build the resume message 00476 size_t mess_size = 1; 00477 block_data_t resume_mess[mess_size]; 00478 resume_mess[0] = RESUME; 00479 //do send the message 00480 radio().send(parent(), mess_size, resume_mess); 00481 #ifdef DEBUG 00482 //increase the message counter 00483 mess_resume_++; 00484 #endif 00485 #ifndef ISENSE_APP 00486 debug().debug("SEND RESUME %d -> %d \n", radio().id(), parent()); 00487 #else 00488 debug().debug("SEND RESUME %x <- %x ", radio().id(), parent()); 00489 #endif 00490 } 00491 } 00492 } 00493 00494 template<typename OsModel_P, 00495 typename Radio_P, typename Timer_P, typename Debug_P> 00496 void 00497 BfsClustering<OsModel_P, Radio_P, Timer_P, Debug_P>:: 00498 timer_expired(void * dat) { 00499 00500 // if no nodes joind start the resume messages 00501 if (!any_joined_) { 00502 // if not a cluster head 00503 if (parent() != radio().id()) { 00504 size_t mess_size = 1; 00505 block_data_t resume_mess[mess_size]; 00506 resume_mess[0] = RESUME; 00507 //do send the message 00508 radio().send(parent(), mess_size, resume_mess); 00509 #ifdef DEBUG 00510 //increase the message counter 00511 mess_resume_++; 00512 #endif 00513 #ifndef ISENSE_APP 00514 debug().debug("SEND RESUME %d -> %d \n", radio().id(), parent()); 00515 #else 00516 debug().debug("SEND RESUME %x <- %x ", radio().id(), parent()); 00517 #endif 00518 }// if a cluster head 00519 else { 00520 00521 if (state_changed_callback_) state_changed_callback_(CLUSTER_FORMED); 00522 #ifndef ISENSE_APP 00523 debug().debug("CLUSTER %d FORMED\n", radio().id()); 00524 #else 00525 debug().debug("CLUSTER %x FORMED", radio().id()); 00526 #endif 00527 } 00528 } 00529 } 00530 } 00531 00532 #endif