Wiselib
|
00001 /* 00002 * File: aggregation.h 00003 * Author: Koninis 00004 * 00005 * Created on January 22, 2011, 1:16 PM 00006 */ 00007 00008 #ifndef AGGREGATION_H 00009 #define AGGREGATION_H 00010 00011 //wiselib includes 00012 #include "algorithms/routing/tree/tree_routing.h" 00013 #include "util/delegates/delegate.hpp" 00014 #include "util/pstl/vector_static.h" 00015 #include "util/pstl/pair.h" 00016 #include "aggregate.h" 00017 #include "aggregationmsg.h" 00018 00019 #define DEBUG_AGGREGATION 00020 00021 namespace wiselib { 00022 00023 template<typename OsModel_P, typename Radio_P,typename Debug_P, 00024 typename Cluster_P, typename Aggregate_P, typename Routing_P> 00025 class Aggregation { 00026 public: 00027 // Type definitions 00028 typedef OsModel_P OsModel; 00029 typedef Radio_P Radio; 00030 typedef Debug_P Debug; 00031 typedef Cluster_P Cluster; 00032 00033 typedef typename OsModel_P::Clock Clock; 00034 typedef typename OsModel_P::Timer Timer; 00035 typedef Aggregate_P Aggregate_t; 00036 typedef typename Aggregate_t::value_t Aggregate_value_t; 00037 00038 typedef Routing_P tree_routing_t; 00039 00040 typedef AggregateMsg<OsModel,Radio> msg_t; 00041 00042 typedef typename Radio::node_id_t node_id_t; 00043 typedef typename Radio::size_t size_t; 00044 typedef typename Radio::block_data_t block_data_t; 00045 typedef typename Radio::message_id_t message_id_t; 00046 typedef typename Clock::time_t time_t; 00047 00048 typedef typename Radio::ExtendedData ExData; 00049 typedef typename Radio::TxPower TxPower; 00050 00051 // typedef EchoMsg<OsModel, Radio> EchoMsg_t; 00052 typedef Aggregation<OsModel_P, Radio_P, Debug_P, Cluster_P, Aggregate_P, Routing_P> self_t; 00053 TxPower power; 00054 00055 // Vector containing the aggregates that the node is going to combine. 00056 typedef wiselib::vector_static<OsModel, Aggregate_t, 10> aggregates_vector_t; 00057 00058 // Iterators for the aggregates_vector_t 00059 typedef typename aggregates_vector_t::iterator iterator_t; 00060 00064 aggregates_vector_t aggregates_vector; 00065 00066 typedef delegate4<void, uint8_t, node_id_t, uint8_t, uint8_t*> 00067 event_notifier_delegate_t; 00068 // typedef status_delegate_t radio_delegate_t; 00069 00070 00071 enum status_codes { 00072 RUNNING = 0, 00073 WAITING = 1, 00074 RECEIVING_VALUES = 2 00075 }; 00076 00077 // -------------------------------------------------------------------- 00078 enum node_roles { 00079 LEAF_NODE = 0, 00080 NORMAL_NODE = 1, 00081 INTERMEDIATE_NODE = 2, 00082 SINK = 3 00083 }; 00084 00088 Aggregation() { 00089 set_role(NORMAL_NODE); 00090 set_status(RECEIVING_VALUES); 00091 }; 00092 00093 /* 00094 * Destructor 00095 */ 00096 ~Aggregation() { 00097 }; 00098 00102 void init (Radio& radio, Timer& timer, Clock& clock, Debug& debug, Cluster& cluster, tree_routing_t& tree) { 00103 radio_ = &radio; 00104 timer_ = &timer; 00105 clock_ = &clock; 00106 debug_ = &debug; 00107 cluster_ = &cluster; 00108 tree_routing_ = &tree; 00109 }; 00110 00111 /* 00112 * Enable the Aggregation system 00113 * enable radio and register receive callback 00114 * initialize vectors 00115 * change status to SEARCHING 00116 * start sending hello messages 00117 * */ 00118 void enable() { 00119 00120 //enable normal radio 00121 radio().enable_radio(); 00122 recv_callback_id_ =radio().template reg_recv_callback<self_t, 00123 &self_t::receive > ( this); 00124 00125 // initialize vectors and variables 00126 init_aggregation(); 00127 }; 00128 00129 // -------------------------------------------------------------------- 00130 00131 /* 00132 * Disable the Aggregation system 00133 * */ 00134 void disable() { 00135 // radio().disable_radio(); 00136 }; 00137 00141 void init_aggregation() { 00142 //blah blah 00143 //Klatu barada nikto 00144 }; 00145 00151 Aggregate_t combine_aggregates(Aggregate_t agg1, Aggregate_t agg2) { 00152 return agg1.combine(agg2); 00153 }; 00154 00155 Aggregate_t combine_all_aggregates() { 00156 iterator_t next_aggregate = aggregates_vector.begin(); 00157 Aggregate_t result; 00158 00159 if (aggregates_vector.size() == 1) return *next_aggregate; 00160 00161 for (; next_aggregate != aggregates_vector.end(); next_aggregate++) { 00162 // debug().debug("agg::combine_all_aggregates::%d %d %d %d",radio().id(),result.get(),(*next_aggregate).get(),aggregates_vector.size()); 00163 result = combine_aggregates(result, *next_aggregate); 00164 // debug().debug("%d\n",result.get()); 00165 } 00166 return result; 00167 }; 00168 00173 void extract_aggregates() { 00174 00175 }; 00176 00177 uint16_t msgs_count() { 00178 return msgs_stats.aggregation_msg_count; 00179 }; 00180 00181 uint32_t msgs_size() { 00182 return msgs_stats.aggregation_msg_size; 00183 }; 00184 00185 void send(Aggregate_t aggregate) { 00186 00187 if (node_role == SINK) { 00188 aggregates_vector.push_back(aggregate); 00189 // extract_aggregates(); 00190 } else if (get_next_node() != radio().id()) { 00191 if ((cluster_->childs_count() == 1) && aggregates_vector.size() == 0) { 00192 msg_t aggMsg; 00193 aggregate.writeTo(aggMsg.payload()); 00194 aggMsg.set_payload_size(aggregate.size()); 00195 00196 // debug().debug("aggregation::send::from%x::type%d::to%x::%d:: sending value to parent",radio().id(), msg_t::AGG_MESSAGE_TYPE, get_next_node(), aggregate.get()); 00197 #ifdef DEBUG_AGGREGATION 00198 // debug().debug("AGGS;%x;%d;%x;%d",radio().id(), msg_t::AGG_MESSAGE_TYPE, radio().BROADCAST_ADDRESS, aggregate.get()); 00199 // debug().debug("aggregation::send::from%x::type%d::to%x::%d:: sending value to parent", 00200 //radio().id(), msg_t::AGG_MESSAGE_TYPE, get_next_node(), aggregate.get()); 00201 #endif 00202 radio().send(get_next_node(), 00203 aggMsg.buffer_size(), 00204 (uint8_t *) &aggMsg); 00205 aggregates_vector.clear(); 00206 } 00207 else { 00208 #ifdef DEBUG_AGGREGATION2 00209 // debug().debug("aggregation::send::%x waiting from children values",radio().id()); 00210 #endif 00211 aggregates_vector.push_back(aggregate); 00212 } 00213 00214 } else if (get_next_node() == radio().id()) {//node_role == INTERMEDIATE_NODE) { 00215 if ( cluster_->childs_count() == (aggregates_vector.size()-1) 00216 || (cluster_->childs_count()==0)) { 00217 00218 msg_t aggMsg; 00219 aggregate.writeTo(aggMsg.payload()); 00220 aggMsg.set_payload_size(aggregate.size()); 00221 aggMsg.set_level(tree_routing_->hops()); 00222 00223 radio().send(radio().BROADCAST_ADDRESS, 00224 aggMsg.buffer_size(), 00225 (uint8_t *) &aggMsg); 00226 // debug().debug("AGGS;%x;%d;%x;%d",radio().id(), msg_t::AGG_MESSAGE_TYPE, radio().BROADCAST_ADDRESS, aggregate.get()); 00227 // tree_routing_->send(0, aggMsg.buffer_size(), (uint8_t *) &aggMsg); 00228 aggregates_vector.clear(); 00229 aggregates_vector.push_back(aggregate); 00230 00231 } 00232 else { 00233 timer().template set_timer<self_t, &self_t::wait_for_chlds> (500, this, 0); 00234 aggregates_vector.push_back(aggregate); 00235 } 00236 //send_to_sink(); 00237 } 00238 00239 }; 00240 00241 void wait_for_chlds(void *data) { 00242 // debug().debug("aggregation::wait_for_chlds timeout ad %x",radio().id()); 00243 if (aggregates_vector.size()>1 && tree_routing_->hops()!=0) { 00244 // debug().debug("aggregation::send::%x::%x::",radio().id(),radio().BROADCAST_ADDRESS); 00245 Aggregate_t aggregate = combine_all_aggregates(); 00246 msg_t aggMsg; 00247 aggregate.writeTo(aggMsg.payload()); 00248 aggMsg.set_payload_size(aggregate.size()); 00249 aggMsg.set_level(tree_routing_->hops()); 00250 00251 radio().send(radio().BROADCAST_ADDRESS, 00252 aggMsg.buffer_size(), 00253 (uint8_t *) &aggMsg); 00254 00255 // tree_routing_->send(0, aggMsg.buffer_size(), (uint8_t *) &aggMsg); 00256 aggregates_vector.clear(); 00257 aggregates_vector.push_back(aggregate); 00258 } 00259 00260 }; 00261 00265 void set_role(uint8_t role) { 00266 node_role = role; 00267 }; 00268 00269 00270 private: 00278 void receive(node_id_t from, size_t len, block_data_t *msg, ExData const &ex) { 00279 00280 if (*msg==msg_t::AGG_MESSAGE_TYPE) { 00281 msg_t *amsg = (msg_t *)msg; 00282 block_data_t *payload = amsg->payload(); 00283 00284 #ifdef DEBUG_AGGREGATION 00285 // debug().debug("aggregation::receive::%d received from %d next_node %d level %d my level %d", 00286 // radio().id(),from, get_next_node(),amsg->level(),tree_routing_->hops()); 00287 #endif 00288 if (amsg->level() == msg_t::IN_CLUSTER) { 00289 debug_->debug("AGGAV;%x;%d;", radio_->id(), combine_all_aggregates().get());//greedy_partition_w_.get()); 00290 00291 if (node_role == SINK) { 00292 aggregates_vector.push_back(Aggregate_t(payload)); 00293 00294 // debug_->debug("AGGAV;%x;%d;", radio_->id(), combine_all_aggregates().get());//greedy_partition_w_.get()); 00295 // debug().debug("AGGR;%x;%d;%x;%d;SINK",radio().id(), msg_t::AGG_MESSAGE_TYPE, from, combine_all_aggregates().get() ); 00296 //TODO 00297 } else if (get_next_node() != radio().id()) {//node_role == NORMAL_NODE) { 00298 00299 // debug_->debug("AGGAV;%x;%d;", radio_->id(), combine_all_aggregates().get());//greedy_partition_w_.get()); 00300 // debug().debug("AGGR;%x;%d;%x;%d", 00301 // radio_->id(), msg_t::AGG_MESSAGE_TYPE, from, combine_all_aggregates().get() ); 00302 if (cluster_->childs_count() == (aggregates_vector.size())) { 00303 send(combine_all_aggregates()); 00304 } 00305 else { 00306 aggregates_vector.push_back(Aggregate_t(payload)); 00307 // timer().template set_timer<self_t, &self_t::wait_for_chlds> (3000, this, 0); 00308 } 00309 00310 } else if (get_next_node() == radio().id()) {//node_role == INTERMEDIATE_NODE) { 00311 00312 aggregates_vector.push_back(Aggregate_t(payload)); 00313 // debug_->debug("AGGAV;%x;%d;", radio_->id(), combine_all_aggregates().get());//greedy_partition_w_.get()); 00314 // debug().debug("AGGR;%x;%d;%x;%d", radio_->id(), msg_t::AGG_MESSAGE_TYPE, from, combine_all_aggregates().get() ); 00315 // debug().debug("aggregation::receive::to%x::type%d::from%x::%d:: from cluster", radio_->id(), msg_t::AGG_MESSAGE_TYPE, from, combine_all_aggregates().get() ); 00316 if ( cluster_->childs_count() == (aggregates_vector.size()-1) ) { 00317 // debug().debug("aggregation::receive::%x send to sink value: %x", radio_->id(), combine_all_aggregates().get() ); 00318 send(combine_all_aggregates()); 00319 } 00320 //TODO 00321 } 00322 } else if (amsg->level() == (tree_routing_->hops() + 1)) { 00323 debug_->debug("AGGAV;%x;%d;", radio_->id(), combine_all_aggregates().get());//greedy_partition_w_.get()); 00324 00325 if (node_role == SINK) { 00326 aggregates_vector.push_back(Aggregate_t(payload)); 00327 // debug_->debug("AGGAV;%x;%d;", radio_->id(), combine_all_aggregates().get());//greedy_partition_w_.get()); 00328 // debug().debug("AGGR;%x;%d;%x;%d;SINK",radio().id(), msg_t::AGG_MESSAGE_TYPE, from, combine_all_aggregates().get() ); 00329 // debug().debug("aggregation::receive::to%x::type%d::from%x::%d:: at SINK",radio().id(), msg_t::AGG_MESSAGE_TYPE, from, combine_all_aggregates().get()); 00330 } else { 00331 Aggregate_t aggregate(payload); 00332 aggregates_vector.push_back(Aggregate_t(payload)); 00333 aggregate = combine_all_aggregates(); 00334 // debug_->debug("AGGAV;%x;%d;", radio_->id(), combine_all_aggregates().get());//greedy_partition_w_.get()); 00335 // debug().debug("AGGS;%x;%d;%x;%d;SINK",radio().id(), msg_t::AGG_MESSAGE_TYPE, radio().BROADCAST_ADDRESS, aggregate.get()); 00336 // debug().debug("aggregation::send::from%x::type%d::to%x::%d::",radio().id(), msg_t::AGG_MESSAGE_TYPE, radio().BROADCAST_ADDRESS, aggregate.get()); 00337 aggregates_vector.clear(); 00338 aggregates_vector.push_back(aggregate); 00339 00340 msg_t aggMsg; 00341 aggregate.writeTo(aggMsg.payload()); 00342 aggMsg.set_payload_size(aggregate.size()); 00343 aggMsg.set_level(tree_routing_->hops()); 00344 00345 radio().send(radio().BROADCAST_ADDRESS, 00346 aggMsg.buffer_size(), 00347 (uint8_t *) &aggMsg); 00348 } 00349 } 00350 } 00351 }; 00352 00353 node_id_t get_next_node() { 00354 return cluster_->parent(); 00355 }; 00356 00362 void set_status(uint8_t status) { 00363 status_ = status; 00364 }; 00365 00369 int status() { 00370 return status_; 00371 }; 00372 00373 int recv_callback_id_; // callback for receive function 00374 uint8_t status_; // status of the module 00375 00376 tree_routing_t * tree_routing_; 00377 Cluster * cluster_; 00378 00384 uint8_t node_role; 00385 00386 struct messages_statistics { 00387 uint16_t aggregation_msg_count; 00388 uint32_t aggregation_msg_size; 00389 }msgs_stats; 00390 00391 Radio * radio_; 00392 Clock * clock_; 00393 Timer * timer_; 00394 Debug * debug_; 00395 00396 Radio& radio() { 00397 return *radio_; 00398 } 00399 00400 Clock& clock() { 00401 return *clock_; 00402 } 00403 00404 Timer& timer() { 00405 return *timer_; 00406 } 00407 00408 Debug& debug() { 00409 #ifdef SHAWN 00410 debug_->debug("\n"); 00411 #endif 00412 00413 return *debug_; 00414 } 00415 00416 }; 00417 } 00418 00419 #endif /* AGGREGATION_H */ 00420