Wiselib
wiselib.testing/algorithms/cluster/lca/lca.h
Go to the documentation of this file.
00001 #ifndef __LCA_H_
00002 #define __LCA_H_
00003 
00004 #include "util/delegates/delegate.hpp"
00005 #include "algorithms/cluster/clustering_types.h"
00006 #include "util/base_classes/clustering_base.h"
00007 #include "algorithms/neighbor_discovery/echo.h"
00008 #include "algorithms/neighbor_discovery/pgb_payloads_ids.h"
00009 #include "algorithms/cluster/join_message.h"
00010 #include "algorithms/cluster/join_accept_message.h"
00011 #include "algorithms/cluster/resume_message.h"
00012 #include "algorithms/cluster/reform_message.h"
00013 
00014 #undef DEBUG
00015 // Uncomment to enable Debug
00016 #define DEBUG
00017 #ifdef DEBUG
00018 //#define DEBUG_PAYLOADS
00019 #endif
00020 
00021 namespace wiselib {
00029 template<typename OsModel_P, typename Radio_P, typename HeadDecision_P,
00030       typename JoinDecision_P, typename Iterator_P>
00031 class LcaCore: public ClusteringBase<OsModel_P> {
00032 public:
00033    //TYPEDEFS
00034    typedef OsModel_P OsModel;
00035    // os modules
00036    typedef Radio_P Radio;
00037    typedef typename OsModel::Timer Timer;
00038    typedef typename OsModel::Debug Debug;
00039    typedef typename OsModel::Rand Rand;
00040    //algorithm modules
00041    typedef HeadDecision_P HeadDecision_t;
00042    typedef JoinDecision_P JoinDecision_t;
00043    typedef Iterator_P Iterator_t;
00044    // self_type
00045    typedef LcaCore<OsModel_P, Radio_P, HeadDecision_P, JoinDecision_P,
00046          Iterator_P> self_type;
00047    typedef wiselib::Echo<OsModel, Radio, Timer, Debug> nb_t;
00048 
00049    // data types
00050    typedef int cluster_level_t; //quite useless within current scheme, supported for compatibility issues
00051    typedef typename Radio::node_id_t node_id_t;
00052    typedef typename Radio::size_t size_t;
00053    typedef typename Radio::block_data_t block_data_t;
00054    typedef node_id_t cluster_id_t;
00055 
00056    /*
00057     * Constructor
00058     * */
00059    LcaCore() :
00060       probability_(30), maxhops_(4), enabled_(false), status_(0), round_(0),
00061             auto_reform_(0), reform_(false), head_lost_(false) {
00062    }
00063 
00064    /*
00065     * Destructor
00066     * */
00067    ~LcaCore() {
00068    }
00069 
00070    /*
00071     * INIT
00072     * initializes the values of radio timer and debug
00073     */
00074    inline void init(Radio& radio, Timer& timer, Debug& debug, Rand& rand,
00075          nb_t& neighbor_discovery) {
00076       radio_ = &radio;
00077       timer_ = &timer;
00078       debug_ = &debug;
00079       rand_ = &rand;
00080       neighbor_discovery_ = &neighbor_discovery;
00081       uint8_t flags = nb_t::DROPPED_NB | nb_t::LOST_NB_BIDI
00082             | nb_t::NEW_PAYLOAD_BIDI | nb_t::NB_READY;
00083 
00084       neighbor_discovery_->template reg_event_callback<self_type,
00085             &self_type::neighbor_discovery_callback> (CLUSTERING, flags,
00086             this);
00087       neighbor_discovery_->register_payload_space((uint8_t) CLUSTERING);
00088    }
00089 
00090    // Set IT
00091    void set_iterator(Iterator_t &it) {
00092       it_ = &it;
00093    }
00094 
00095    // Set JD
00096    void set_join_decision(JoinDecision_t &jd) {
00097       jd_ = &jd;
00098    }
00099 
00100    // Set CHD
00101    void set_cluster_head_decision(HeadDecision_t &chd) {
00102       chd_ = &chd;
00103    }
00104 
00105    // Set the CHD probability
00106    void set_probability(int prob) {
00107       probability_ = prob;
00108    }
00109 
00110    // Set the JD maximum hops from CH
00111    void set_maxhops(int maxhops) {
00112       maxhops_ = maxhops;
00113    }
00114 
00115    node_id_t parent() {
00116       return it().parent();
00117    }
00118 
00119    cluster_id_t cluster_id() {
00120       return it().cluster_id();
00121    }
00122 
00123    int node_type() {
00124       return it().node_type();
00125    }
00126 
00127    int hops() {
00128       return it().hops();
00129    }
00130 
00131    /*
00132     * The status Of the Clustering Algorithm
00133     * 1 means a cluster is being formed
00134     * 0 means cluster is formed
00135     */
00136    inline uint8_t status() {
00137       //1 - forming
00138       //0 - formed
00139       if (enabled_) {
00140          return status_;
00141       } else {
00142          return 2;
00143       }
00144    }
00145 
00146    /*
00147     * Returns the Cluster head
00148     * status of the node
00149     */
00150    inline bool is_cluster_head() {
00151       if (node_type() != UNCLUSTERED) {
00152          return cluster_id() == radio().id();
00153       } else {
00154          return false;
00155       }
00156    }
00157 
00158    /* SHOW all known nodes */
00159 
00160    inline void present_neighbors(void) {
00161       if (!status()) {
00162          it().present_neighbors();
00163       }
00164    }
00165 
00166    /*
00167     * Enable
00168     * enables the mbfsclustering module
00169     * enable chd it and jd modules
00170     * initializes values
00171     * registers callbacks
00172     * calls find head to start clustering
00173     * */
00174    void enable() {
00175       if (enabled_)
00176          return;
00177       enabled_ = true;
00178       //set clustering round to 0
00179       round_ = 0;
00180       rand().srand(radio().id());
00181 
00182       //initialize the clustering modules
00183       chd().init(radio(), debug(), rand());
00184       jd().init(radio(), debug());
00185       it().init(radio(), timer(), debug());
00186 
00187 #ifdef DEBUG        
00188       debug().debug("Enable::%x::%d::%d::", radio().id(), probability_,
00189             maxhops_);
00190 #endif
00191 
00192       // receive receive callback
00193       receive_callback_id_ = radio().template reg_recv_callback<self_type,
00194             &self_type::receive> (this);
00195       // set variables of other modules
00196       chd().set_probability(probability_);
00197       jd().set_maxhops(maxhops_);
00198 
00199       timer().template set_timer<self_type, &self_type::form_cluster> (
00200             time_slice_, this, (void *) maxhops_);
00201    }
00202 
00203    /*
00204     * Disable
00205     * disables the bfsclustering module
00206     * unregisters callbacks
00207     * */
00208    void disable() {
00209       // Unregister the callback
00210       radio().unreg_recv_callback(receive_callback_id_);
00211    }
00212 
00213    // Call with a timer to start a reform procedure from the cluster head
00214 
00215    inline void reform_cluster(void * parameter) {
00216       reform_ = true;
00217    }
00218 
00219    // Start the procedure needed to form a cluster
00220 
00221    inline void form_cluster(void * parameter) {
00222       if (!enabled_)
00223          return;
00224 
00225       status_ = 1;
00226 
00227       //enabling
00228       chd().reset();
00229       jd().reset();
00230       it().reset();
00231 
00232       // start the procedure to find new head
00233       //find_head(0);
00234       timer().template set_timer<self_type, &self_type::find_head> (
00235             time_slice_, this, (void *) 0);
00236       // reform is false as cluster is not yet formed
00237       reform_ = false;
00238    }
00239 
00240    /*
00241     * FIND_HEAD
00242     * starts clustering
00243     * decides a head and then start clustering
00244     * from the head of each cluster
00245     * */
00246    void find_head(void *) {
00247       if (chd().calculate_head() == true) {
00248          // set values for iterator and join_decision
00249          it().set_parent(radio().id());
00250          it().set_cluster_id(radio().id());
00251          jd().set_cluster_id(radio().id());
00252          chd().set_probability(probability_);
00253 
00254          // inform for state change
00255          this->state_changed(CLUSTER_HEAD_CHANGED);
00256 
00257          if (auto_reform_ > 0) {
00258             timer().template set_timer<self_type,
00259                   &self_type::reform_cluster> (
00260                   auto_reform_ * time_slice_, this, (void *) maxhops_);
00261          }
00262          status_ = 0;
00263 
00264          JoinClusterMsg<OsModel, Radio> msg =
00265                jd().get_join_request_payload();
00266          // send JOIN
00267          radio().send(Radio::BROADCAST_ADDRESS, msg.length(),
00268                (block_data_t *) &msg);
00269 #ifdef DEBUG
00270          debug().debug("Send::%x::%d::%x", radio().id(), msg.msg_id(),
00271                Radio::BROADCAST_ADDRESS);
00272 #endif
00273 
00274          //Check after some time if Any accept messages were received
00275          //2*time_slice for messages to be sent and received
00276          timer().template set_timer<self_type, &self_type::timer_expired> (2
00277                * maxhops_ * time_slice_, this, 0);
00278       } else {
00279          timer().template set_timer<self_type, &self_type::wait_for_joins> (
00280                maxhops_ * time_slice_, this, 0);
00281       }
00282    }
00283 
00284    void wait_for_joins(void * data) {
00285       head_lost_ = false;
00286 
00287       // if noone aroung as cluster head
00288       // become a cluster head and search for nodes
00289       if (it().node_type() == UNCLUSTERED) {
00290 #ifdef DEBUG
00291          debug().debug("Not clustered yet, Start own Cluster %x",
00292                radio().id());
00293 #endif
00294          //become a cluster head - set probability to 100%
00295          //chd().set_probability(100);
00296          // start clustering
00297          find_head(0);
00298       } else {
00299          if (jd().hops() < maxhops_) {
00300             JoinClusterMsg<OsModel, Radio> msg =
00301                   jd().get_join_request_payload();
00302             // Forward message from previous node
00303             radio().send(Radio::BROADCAST_ADDRESS, msg.length(),
00304                   (block_data_t *) &msg);
00305 #ifdef DEBUG
00306             debug().debug("Send::%x::%d::%x::%d::", radio().id(),
00307                   msg.msg_id(), Radio::BROADCAST_ADDRESS, msg.length());
00308 #endif
00309 
00310             //set the timer to check for clustering end
00311             timer().template set_timer<self_type, &self_type::timer_expired> (
00312                   time_slice_, this, (void*) 0);
00313          } else {
00314             // if no more hops to propagate the join message finalize cluster formation
00315             timer_expired(0);
00316          }
00317          //notify for join
00318          this->state_changed(NODE_JOINED);
00319       }
00320    }
00321 
00322    /*
00323     * TIMER_EXPIRED
00324     * if timer_expired is called and no
00325     * join messages were received by non CH nodes
00326     * node becomes a CH and starts its ow cluster
00327     * */
00328    inline void timer_expired(void * timer_value) {
00329       // if none joind under the node
00330       if (!it().any_joined()) {
00331          // if not a cluster head
00332          if (it().node_type() != HEAD) {
00333             // create a resume message
00334             ResumeClusterMsg<OsModel, Radio> msg =
00335                   it().get_resume_payload();
00336             //do send the message
00337             radio().send(it().parent(), msg.length(), (block_data_t *) &msg);
00338 
00339 #ifdef DEBUG
00340             debug().debug("Send::%x::%d::%x::", radio().id(), msg.msg_id(),
00341                   it().parent());
00342 #endif
00343          }// if a cluster head end the clustering under this branch
00344          else {
00345             this->state_changed(CLUSTER_FORMED);
00346          }
00347          status_ = 0;
00348       }
00349    }
00350 
00351 protected:
00352 
00353    void neighbor_discovery_callback(uint8_t event, node_id_t from,
00354          uint8_t len, uint8_t* data) {
00355       if (nb_t::NEW_PAYLOAD_BIDI == event) {
00356          receive_beacon(from, len, data);
00357          //reset my beacon according to the new status
00358          uint8_t buf[beacon_size()];
00359          get_beacon(buf);
00360          if (neighbor_discovery_->set_payload((uint8_t) CLUSTERING, buf,
00361                beacon_size()) != 0) {
00362 #ifdef DEBUG
00363             debug_->debug("Error::%x::", radio_->id());
00364 #endif
00365          }
00366       } else if ((nb_t::LOST_NB_BIDI == event) || (nb_t::DROPPED_NB == event)) {
00367          node_lost(from);
00368 #ifdef DEBUG
00369          debug().debug("Drop::%x::%x::", radio().id(), from);
00370 #endif
00371       } else if (nb_t::NB_READY == event) {
00372          // when neighborhood is ready start clustering
00373          enable();
00374          uint8_t buf[beacon_size()];
00375          get_beacon(buf);
00376          if (neighbor_discovery_->set_payload((uint8_t) CLUSTERING, buf,
00377                beacon_size()) != 0) {
00378 #ifdef DEBUG
00379             debug_->debug("Error::%x::", radio_->id());
00380 #endif
00381          }
00382       }
00383    }
00384 
00385    /*
00386     * Size of the payload to the ND module beacon
00387     */
00388    size_t beacon_size() {
00389       JoinClusterMsg<OsModel, Radio> msg;
00390       //send a new join message using the beacon
00391       return msg.length();
00392    }
00393 
00394    /*
00395     * Receive a beacon payload
00396     * check for new head if needed
00397     * check if in need to reform
00398     */
00399    void receive_beacon(node_id_t node_from, size_t len, uint8_t * data) {
00400       //receive the beacon data
00401       JoinClusterMsg<OsModel, Radio> msg;
00402       memcpy(&msg, data, len);
00403       node_id_t cluster = msg.cluster_id();
00404       int hops = msg.hops();
00405 
00406       //if the connection to the cluster head was lost
00407       if (head_lost_) {
00408 
00409          //if the beacon came from a cluster head
00410          if (node_from == cluster) {
00411 
00412             // join him
00413             // inform iterator about the new cluster
00414             it().set_parent(node_from);
00415             it().set_cluster_id(cluster);
00416             jd().set_cluster_id(cluster);
00417             it().set_hops(hops);
00418             jd().set_hops(hops);
00419             it().set_node_type(SIMPLE);
00420             it().node_joined(node_from);
00421 
00422             // if joined , node state changed
00423             this->state_changed(NODE_JOINED);
00424 
00425             //mark that the head_lost_ situation was resolved
00426             head_lost_ = false;
00427             timer_expired(0);
00428          }
00429       }
00430 
00431       //SET the node lists accordingly
00432       if (cluster == radio().id()) {
00433          it().node_joined(node_from);
00434       } else {
00435          it().node_not_joined(node_from);
00436       }
00437 
00438       //if message was sent from a cluster head
00439       if (node_from == cluster) {
00440 
00441          //debug().debug("Got A Beacon node :%x from :%x status:%x\n",radio().id(),node_from,status_);
00442 
00443          /*       // if the messages says reform and it was sent by my
00444           // cluster head , and i am not already reforming
00445           if ((reform) && (status() == 0)) {
00446           //if ((reform)&&(node_from == cluster_id())&&(!status_)){
00447           status_ = 1;
00448           #ifdef DEBUG
00449           debug().debug("Reform::%x", radio().id());
00450           #endif
00451           //timer().template set_timer<self_type, &self_type::form_cluster > (time_slice_*0.9, this, (void *) maxhops_ );
00452           form_cluster((void*) maxhops_);
00453 
00454           } else*/
00455          if (is_cluster_head()) {
00456             if (it().node_count(1) == 0) {
00457                if (cluster < cluster_id()) {
00458                   debug().debug("Orphan::%x", radio().id());
00459                   // join him
00460                   // inform iterator about the new cluster
00461                   it().set_parent(node_from);
00462                   it().set_cluster_id(cluster);
00463                   jd().set_cluster_id(cluster);
00464                   it().set_hops(hops);
00465                   jd().set_hops(hops);
00466                   it().set_node_type(SIMPLE);
00467                   it().node_joined(node_from);
00468 
00469                   // if joined , node state changed
00470                   this->state_changed(NODE_JOINED);
00471 
00472                   //create the resyme message
00473                   ResumeClusterMsg<OsModel, Radio> msg =
00474                         it().get_resume_payload();
00475                   //do send the message
00476                   radio().send(it().parent(), msg.length(),
00477                         (block_data_t *) &msg);
00478 #ifdef DEBUG
00479                   debug().debug("Send::%x::%d::%x::%d::", radio().id(),
00480                         msg.msg_id(), it().parent(), msg.length());
00481 #endif
00482                }
00483             }
00484          }
00485       } else {
00486          //if the sender was my cluster head and is no more a CH
00487          if (node_from == cluster_id()) {
00488             node_lost(cluster_id());
00489          }
00490       }
00491    }
00492 
00493    /*
00494     * Get a payload
00495     * to save on a beacon message
00496     */
00497    void get_beacon(uint8_t * mess) {
00498       JoinClusterMsg<OsModel, Radio> msg = jd().get_join_request_payload();
00499       memcpy(mess, &msg, msg.length());
00500    }
00501 
00502    /*
00503     * RECEIVE
00504     * respond to the new messages received
00505     * callback from the radio
00506     * */
00507    void receive(node_id_t from, size_t len, block_data_t *data) {
00508 
00509       // drop own messages
00510       if (radio().id() == from)
00511          return;
00512       if (!neighbor_discovery_->is_neighbor_bidi(from))
00513          return;
00514 
00515       // get Type of Message
00516       uint8_t type = *data;
00517 
00518       // type=JOIN
00519       if (type == JOIN) {
00520 #ifdef RECEIVE_DEBUG
00521          debug().debug("RECEIVED JOIN Node %x <- %x\n", radio().id(), from);
00522 #endif
00523          if (node_type() == HEAD)
00524             return;
00525          // try to join
00526          if (jd().join(data, len)) {
00527             // set values for iterator and join_decision
00528             it().set_parent(from);
00529             it().set_cluster_id(jd().cluster_id());
00530             it().set_hops(jd().hops());
00531             it().set_node_type(SIMPLE);
00532             it().node_joined(from);
00533             this->state_changed(NODE_JOINED);
00534          }
00535       } else if (type == RESUME) {
00536 #ifdef RECEIVE_DEBUG
00537          debug().debug("RECEIVED RESUME Node %x <- %x\n", radio().id(), from);
00538 #endif
00539       }
00540    }
00541 
00542    /*
00543     * Called when ND lost contact with a node
00544     * If the node was cluster head
00545     *  - start searching for new head
00546     * else
00547     *  - remove node from known nodes
00548     */
00549    inline void node_lost(node_id_t node) {
00550       //If the node was my CH
00551       if (node == cluster_id()) {
00552          //Reset Iterator
00553          it().reset();
00554          //Mark as headless
00555          head_lost_ = true;
00556          //Timeout for new CH beacons
00557          timer().template set_timer<self_type, &self_type::wait_for_joins> (
00558                maxhops_ * time_slice_, this, 0);
00559       } else {
00560          //if not my CH
00561          //Remove from Iterator
00562          it().drop_node(node);
00563       }
00564    }
00565 
00566 private:
00567    int receive_callback_id_; // receive message callback
00568    int probability_; // clustering parameter
00569    int maxhops_;
00570    nb_t * neighbor_discovery_;
00571    bool enabled_;
00572    uint8_t status_; // the status of the clustering algorithm
00573    static const uint32_t time_slice_ = 2000; // time to wait for cluster accept replies
00574    int round_;
00575    int auto_reform_; //time to autoreform the clusters
00576    bool reform_; // flag to start reforming
00577    bool head_lost_; // flag when the head was lost
00578 
00579    /* CLustering algorithm modules */
00580    HeadDecision_t * chd_;
00581    JoinDecision_t * jd_;
00582    Iterator_t * it_;
00583 
00584    Iterator_t& it() {
00585       return *it_;
00586    }
00587 
00588    JoinDecision_t& jd() {
00589       return *jd_;
00590    }
00591 
00592    HeadDecision_t& chd() {
00593       return *chd_;
00594    }
00595 
00596    Radio * radio_; // radio module
00597    Timer * timer_; // timer module
00598    Debug * debug_; // debug module
00599    Rand * rand_;
00600 
00601    Radio& radio() {
00602       return *radio_;
00603    }
00604 
00605    Timer& timer() {
00606       return *timer_;
00607    }
00608 
00609    Debug& debug() {
00610       return *debug_;
00611    }
00612 
00613    Rand& rand() {
00614       return *rand_;
00615    }
00616 };
00617 }
00618 #endif
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines