Wiselib
|
00001 #ifndef _MOCA_MocaCore_H 00002 #define _MOCA_MocaCore_H 00003 00004 #include "algorithms/cluster/clustering_types.h" 00005 #include "util/base_classes/clustering_base.h" 00006 00007 #undef DEBUG 00008 //Uncomment to enable debug 00009 #define DEBUG 00010 00011 namespace wiselib { 00012 00020 template<typename OsModel_P, 00021 typename HeadDecision_P, 00022 typename JoinDecision_P, 00023 typename Iterator_P> 00024 class MocaCore 00025 : public ClusteringBase <OsModel_P> { 00026 public: 00027 //TYPEDEFS 00028 typedef OsModel_P OsModel; 00029 // os modules 00030 typedef typename OsModel::Radio Radio; 00031 typedef typename OsModel::Timer Timer; 00032 typedef typename OsModel::Debug Debug; 00033 typedef typename OsModel::Rand Rand; 00034 //algorithm modules 00035 typedef HeadDecision_P HeadDecision_t; 00036 typedef JoinDecision_P JoinDecision_t; 00037 typedef Iterator_P Iterator_t; 00038 // self t 00039 typedef MocaCore<OsModel_P, HeadDecision_P, JoinDecision_P, Iterator_P > self_t; 00040 00041 // data types 00042 typedef int cluster_level_t; //quite useless within current scheme, supported for compatibility issues 00043 typedef typename Radio::node_id_t node_id_t; 00044 typedef typename Radio::size_t size_t; 00045 typedef typename Radio::block_data_t block_data_t; 00046 typedef node_id_t cluster_id_t; 00047 00048 // delegate 00049 typedef delegate1<void, int> cluster_delegate_t; 00050 00051 /* 00052 * Constructor 00053 * */ 00054 MocaCore() : 00055 probability_(30), 00056 maxhops_(3) { 00057 } 00058 00059 /* 00060 * Destructor 00061 * */ 00062 ~MocaCore() { 00063 } 00064 00065 /* 00066 * INIT 00067 * initializes the values of radio timer and debug 00068 */ 00069 void init(Radio& radio, Timer& timer, Debug& debug) { 00070 radio_ = &radio; 00071 timer_ = &timer; 00072 debug_ = &debug; 00073 } 00074 00075 00076 /* set functions */ 00077 00078 /* SET the clustering modules */ 00079 00080 // Set the iterator Module 00081 00082 void set_iterator(Iterator_t &it) { 00083 it_ = ⁢ 00084 } 00085 00086 // Set the join_decision Module 00087 00088 void set_join_decision(JoinDecision_t &jd) { 00089 jd_ = &jd; 00090 } 00091 00092 // Set the cluster_head_decision Module 00093 00094 void set_cluster_head_decision(HeadDecision_t &chd) { 00095 chd_ = &chd; 00096 } 00097 00098 // Set the theta value 00099 00100 void set_probability(int prob) { 00101 probability_ = prob; 00102 } 00103 00104 // Set the maxhops value 00105 00106 void set_maxhops(int maxhops) { 00107 maxhops_ = maxhops; 00108 } 00109 00110 /* GET functions 00111 00112 //get the parent value 00113 */ 00114 node_id_t parent(cluster_id_t cluster_id = 0) { 00115 return it().parent(cluster_id); 00116 } 00117 00118 //get the cluster id 00119 00120 cluster_id_t cluster_id(size_t cluster_no = 0) { 00121 return it().cluster_id(cluster_no); 00122 } 00123 00124 //get the hops from the cluster head 00125 00126 size_t hops(cluster_id_t cluster_id = 0) { 00127 return it().hops(cluster_id); 00128 } 00129 00130 /* SHOW all the known nodes */ 00131 00132 void present_neighbors() { 00133 it().present_neighbors(); 00134 } 00135 00136 int node_type() { 00137 return it().node_type(); 00138 } 00139 00140 /* CALLBACKS */ 00141 00142 /*template<class T, void(T::*TMethod)(int) > 00143 inline int reg_state_changed_callback(T *obj_pnt) { 00144 state_changed_callback_ = cluster_delegate_t::from_method<T, TMethod > (obj_pnt); 00145 return state_changed_callback_; 00146 } 00147 00148 void unreg_changed_callback(int idx) { 00149 state_changed_callback_ = cluster_delegate_t(); 00150 } 00151 */ 00152 00153 00154 /* 00155 * Enable 00156 * enables the mbfsclustering module 00157 * enable chd it and jd modules 00158 * initializes values 00159 * registers callbacks 00160 * calls find head to start clustering 00161 * */ 00162 void enable(); 00163 00164 /* 00165 * Disable 00166 * disables the bfsclustering module 00167 * unregisters callbacks 00168 * */ 00169 void disable(); 00170 00171 /* 00172 * FIND_HEAD 00173 * starts clustering 00174 * decides a head and then start clustering 00175 * from the head of each cluster 00176 * */ 00177 void find_head(); 00178 00179 /* 00180 * RECEIVE_NEXT 00181 * 00182 * */ 00183 void receive_next(int num); 00184 00185 /* 00186 * TIMER_EXPIRED 00187 * if timer_expired is called 00188 * clustering procedure ends 00189 * */ 00190 void timer_expired(void *); 00191 00192 void reply_to_head(void *); 00193 00194 #ifdef DEBUG 00195 00196 int mess_join() { 00197 return mess_join_; 00198 } 00199 00200 int mess_join_request() { 00201 return mess_join_request_; 00202 } 00203 #endif 00204 00205 protected: 00206 /* 00207 * RECEIVE 00208 * respond to the new messages received 00209 * callback from the radio 00210 * */ 00211 void receive(node_id_t receiver, size_t len, block_data_t *data); 00212 00213 private: 00214 00215 int callback_id_; // receive message callback 00216 //cluster_delegate_t state_changed_callback_; // state changed callback to processor 00217 int next_callback_id_; // unused 00218 00219 int probability_; // clustering parameter 00220 int maxhops_; 00221 static const int time_slice_ = 1000; // time to wait for cluster accept replies 00222 00223 00224 /* CLustering algorithm modules */ 00225 HeadDecision_t * chd_; 00226 00227 HeadDecision_t& chd() { 00228 return *chd_; 00229 } 00230 JoinDecision_t * jd_; 00231 00232 JoinDecision_t& jd() { 00233 return *jd_; 00234 } 00235 Iterator_t * it_; 00236 00237 Iterator_t& it() { 00238 return *it_; 00239 } 00240 00241 #ifdef DEBUG 00242 int mess_join_; 00243 int mess_join_request_; 00244 #endif 00245 00246 00247 Radio * radio_; // radio module 00248 Timer * timer_; // timer module 00249 Debug * debug_; // debug module 00250 Rand * rand_; 00251 00252 Radio& radio() { 00253 return *radio_; 00254 } 00255 00256 Timer& timer() { 00257 return *timer_; 00258 } 00259 00260 Debug& debug() { 00261 return *debug_; 00262 } 00263 00264 Rand& rand() { 00265 return *rand_; 00266 } 00267 00268 00269 }; 00270 00271 template<typename OsModel_P, typename HeadDecision_P, typename JoinDecision_P, typename Iterator_P> 00272 void 00273 MocaCore<OsModel_P, HeadDecision_P, JoinDecision_P, Iterator_P>:: 00274 enable(void) { 00275 00276 //enable the radio 00277 radio().enable_radio(); 00278 00279 //initialize the clustering modules 00280 chd().init(radio(), debug(), rand()); 00281 jd().init(radio(), debug()); 00282 it().init(radio(), timer(), debug()); 00283 00284 //reset_mess_counters(); 00285 00286 #ifdef DEBUG 00287 debug().debug("Enable node %x \n", radio().id()); 00288 #endif 00289 00290 // receive receive callback 00291 callback_id_ 00292 = radio().template reg_recv_callback<self_t, &self_t::receive > (this); 00293 00294 00295 00296 // register next callback 00297 //next_callback_id_ = it().template reg_next_callback<self_t, 00298 // &self_t::receive_next > (this); 00299 00300 //enabling 00301 chd().enable(); 00302 jd().enable(); 00303 it().enable(); 00304 00305 // set variables of other modules 00306 chd().set_probability(probability_); 00307 jd().set_maxhops(maxhops_); 00308 00309 chd().set_id(radio().id()); 00310 jd().set_id(radio().id()); 00311 it().set_id(radio().id()); 00312 00313 00314 // start the procedure to find new head 00315 find_head(); 00316 } 00317 ; 00318 00319 template<typename OsModel_P, typename HeadDecision_P, typename JoinDecision_P, typename Iterator_P> 00320 void MocaCore<OsModel_P, HeadDecision_P, JoinDecision_P, Iterator_P> 00321 ::find_head(void) { 00322 #ifdef DEBUG 00323 debug().debug("stage 1 - Cluster head decision\n"); 00324 #endif 00325 // if Cluster Head 00326 if (chd().calculate_head() == true) { 00327 00328 // set values for iterator and join_decision 00329 it().set_node_type(HEAD); 00330 //it().set_cluster_id(radio().id()); 00331 //jd().set_cluster_id(radio().id()); 00332 00333 jd().set_id(radio().id()); 00334 00335 #ifdef DEBUG 00336 debug().debug("stage 2 - Join\n"); 00337 #endif 00338 00339 //jd(). get join payload 00340 int mess_size = jd().get_payload_length(JOIN); 00341 00342 block_data_t m_sid[mess_size]; 00343 00344 jd().get_join_request_payload(m_sid); 00345 00346 // send JOIN 00347 radio().send(Radio::BROADCAST_ADDRESS, mess_size, m_sid); 00348 #ifdef DEBUG 00349 cluster_id_t cluster; 00350 uint8_t hops; 00351 memcpy(&cluster, m_sid + 1, sizeof (cluster_id_t)); 00352 memcpy(&hops, m_sid + 1 + sizeof (cluster_id_t), sizeof (uint8_t)); 00353 debug().debug("SEND JOIN Node %x, [%x,%d]\n", radio().id(), cluster, hops); 00354 mess_join_++; 00355 #endif 00356 00357 // Cluster is head now 00358 //this->state_changed(CLUSTER_HEAD_CHANGED); 00359 00360 00361 00362 //set the time to check for finished clustering 00363 timer().template set_timer<self_t, 00364 &self_t::timer_expired > (2 * maxhops_ * time_slice_, this, (void*) 0); 00365 00366 } else { 00367 timer().template set_timer<self_t, 00368 &self_t::reply_to_head > (maxhops_ * time_slice_, this, (void*) 0); 00369 00370 } 00371 00372 } 00373 ; 00374 00375 template<typename OsModel_P, typename HeadDecision_P, typename JoinDecision_P, typename Iterator_P> 00376 void MocaCore<OsModel_P, HeadDecision_P, JoinDecision_P, Iterator_P> 00377 ::disable(void) { 00378 // Unregister the callback 00379 radio().unreg_recv_callback(callback_id_); 00380 it().unreg_next_callback(next_callback_id_); 00381 } 00382 00383 template<typename OsModel_P, typename HeadDecision_P, typename JoinDecision_P, typename Iterator_P> 00384 void MocaCore<OsModel_P, HeadDecision_P, JoinDecision_P, Iterator_P> 00385 ::receive(node_id_t from, size_t len, block_data_t* data) { 00386 if (from == radio().id()) return; 00387 00388 // get Type of Message 00389 int type = data[0]; 00390 00391 if (type == JOIN) { 00392 if (it().node_type() != HEAD) { 00393 if (jd().join(data, len)) { 00394 cluster_id_t cluster; 00395 memcpy(&cluster, data + 1, sizeof (cluster_id_t)); 00396 uint8_t hops; 00397 memcpy(&hops, data + 1 + sizeof (cluster_id_t), 1); 00398 if (it().add_cluster(cluster, hops, from)) { 00399 #ifdef DEBUG 00400 debug().debug("Node %x Joined Cluster %x, %d hops away\n", radio().id(), cluster, hops); 00401 #endif 00402 00403 hops++; 00404 uint8_t newjoin[len]; 00405 memcpy(newjoin, data, len); 00406 memcpy(newjoin + 1 + sizeof (cluster_id_t), (void *) & hops, 1); 00407 00408 if (hops <= maxhops_) { 00409 radio().send(Radio::BROADCAST_ADDRESS, len, newjoin); 00410 #ifdef DEBUG 00411 debug().debug("SEND JOIN Node %x [%x,%d]\n", radio().id(), cluster, hops); 00412 mess_join_++; 00413 #endif 00414 } 00415 } 00416 } 00417 } else { 00418 cluster_id_t cluster; 00419 memcpy(&cluster, data + 1, sizeof (cluster_id_t)); 00420 uint8_t hops; 00421 memcpy(&hops, data + 1 + sizeof (cluster_id_t), 1); 00422 if (it().add_cluster(cluster, hops, from)) { 00423 #ifdef DEBUG 00424 debug().debug("Node %x knows Cluster %x, %d hops away\n", radio().id(), cluster, hops); 00425 #endif 00426 hops++; 00427 uint8_t newjoin[len]; 00428 memcpy(newjoin, data, len); 00429 memcpy(newjoin + 1 + sizeof (cluster_id_t), (void *) & hops, 1); 00430 00431 if (hops <= maxhops_) { 00432 00433 radio().send(Radio::BROADCAST_ADDRESS, len, newjoin); 00434 #ifdef DEBUG 00435 debug().debug("SEND JOIN Node %x [%x,%d]\n", radio().id(), cluster, hops); 00436 mess_join_++; 00437 #endif 00438 } 00439 } 00440 00441 00442 } 00443 } else if (type == JOIN_REQUEST) { 00444 debug().debug("Join req\n"); 00445 if (!it().eat_request(len, data)) { 00446 node_id_t original_sender; 00447 memcpy(&original_sender, data + 1, sizeof (node_id_t)); 00448 cluster_id_t mess_cluster; 00449 memcpy(&mess_cluster, data + 1 + sizeof (node_id_t), sizeof (cluster_id_t)); 00450 node_id_t dest = it().parent(mess_cluster); 00451 #ifdef DEBUG 00452 debug().debug("To resend %x , cluster %x from %x sender %x \n", radio().id(), mess_cluster, from, original_sender); 00453 #endif 00454 radio().send(dest, len, data); 00455 #ifdef DEBUG 00456 debug().debug("SEND JOIN_REQUEST %x -> %x\n", radio().id(), dest); 00457 mess_join_request_++; 00458 #endif 00459 } 00460 } 00461 } 00462 00463 template<typename OsModel_P, typename HeadDecision_P, typename JoinDecision_P, typename Iterator_P> 00464 void MocaCore<OsModel_P, HeadDecision_P, JoinDecision_P, Iterator_P> 00465 ::timer_expired(void * d) { 00466 #ifdef DEBUG 00467 debug().debug("Clustering Finished %x \n", radio().id()); 00468 #endif 00469 //it().present_neighbors(); 00470 } 00471 00472 template<typename OsModel_P, typename HeadDecision_P, typename JoinDecision_P, typename Iterator_P> 00473 void MocaCore<OsModel_P, HeadDecision_P, JoinDecision_P, Iterator_P> 00474 ::reply_to_head(void * d) { 00475 #ifdef DEBUG 00476 debug().debug("stage 3 - Reply\n"); 00477 //it().present_neighbors(); 00478 #endif 00479 00480 if (it().clusters_joined() < 1) { 00481 #ifdef DEBUG 00482 debug().debug("Node Joined no cluster, change to cluster_head\n"); 00483 debug().debug("NODE UNCOVERED\n"); 00484 00485 #endif 00486 it().set_node_type(HEAD); 00487 timer().template set_timer<self_t, 00488 &self_t::timer_expired > (maxhops_ * time_slice_, this, (void*) 0); 00489 00490 } else { 00491 #ifdef DEBUG 00492 debug().debug("Reply_to_head Node %x, %d clusters\n", radio().id(), it().clusters_joined()); 00493 #endif 00494 for (size_t i = 0; i < it().clusters_joined(); i++) { 00495 00496 cluster_id_t cluster_id = it().cluster_id(i); 00497 node_id_t dest = it().parent(cluster_id); 00498 size_t mess_size = it().get_payload_length(JOIN_REQUEST, cluster_id); 00499 block_data_t mess[mess_size]; 00500 #ifdef DEBUG 00501 debug().debug("SEND JOIN_REQUEST %x -> %x ", radio().id(), dest); 00502 mess_join_request_++; 00503 #endif 00504 it().get_join_request_payload(mess, cluster_id); 00505 00506 radio().send(dest, mess_size, mess); 00507 } 00508 00509 } 00510 00511 } 00512 } 00513 #endif /* _MOCA_MocaCore_H */ 00514