IBR-DTNSuite
0.8
|
00001 /* 00002 * DatagramConvergenceLayer.cpp 00003 * 00004 * Created on: 21.11.2011 00005 * Author: morgenro 00006 */ 00007 00008 #include "net/DatagramConvergenceLayer.h" 00009 #include "net/DatagramConnection.h" 00010 00011 #include "core/BundleCore.h" 00012 #include "core/TimeEvent.h" 00013 #include "core/NodeEvent.h" 00014 00015 #include <ibrcommon/Logger.h> 00016 #include <ibrcommon/thread/MutexLock.h> 00017 00018 #include <string.h> 00019 00020 namespace dtn 00021 { 00022 namespace net 00023 { 00024 DatagramConvergenceLayer::DatagramConvergenceLayer(DatagramService *ds) 00025 : _service(ds), _discovery_sn(0) 00026 { 00027 } 00028 00029 DatagramConvergenceLayer::~DatagramConvergenceLayer() 00030 { 00031 // wait until all connections are down 00032 { 00033 ibrcommon::MutexLock l(_connection_cond); 00034 while (_connections.size() != 0) _connection_cond.wait(); 00035 } 00036 00037 join(); 00038 delete _service; 00039 } 00040 00041 dtn::core::Node::Protocol DatagramConvergenceLayer::getDiscoveryProtocol() const 00042 { 00043 return _service->getProtocol(); 00044 } 00045 00046 void DatagramConvergenceLayer::callback_send(DatagramConnection&, const char &flags, const unsigned int &seqno, const std::string &destination, const char *buf, int len) throw (DatagramException) 00047 { 00048 // only on sender at once 00049 ibrcommon::MutexLock l(_send_lock); 00050 00051 // forward the send request to DatagramService 00052 _service->send(HEADER_SEGMENT, flags, seqno, destination, buf, len); 00053 } 00054 00055 void DatagramConvergenceLayer::callback_ack(DatagramConnection&, const unsigned int &seqno, const std::string &destination) throw (DatagramException) 00056 { 00057 // only on sender at once 00058 ibrcommon::MutexLock l(_send_lock); 00059 00060 // forward the send request to DatagramService 00061 _service->send(HEADER_ACK, 0, seqno, destination, NULL, 0); 00062 } 00063 00064 void DatagramConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job) 00065 { 00066 const std::list<dtn::core::Node::URI> uri_list = node.get(_service->getProtocol()); 00067 if (uri_list.empty()) return; 00068 00069 // get the first element of the result 00070 const dtn::core::Node::URI &uri = uri_list.front(); 00071 00072 // some debugging 00073 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConvergenceLayer::queue"<< IBRCOMMON_LOGGER_ENDL; 00074 00075 // lock the connection list while working with it 00076 ibrcommon::MutexLock lc(_connection_cond); 00077 00078 // get a new or the existing connection for this address 00079 DatagramConnection &conn = getConnection( uri.value ); 00080 00081 // queue the job to the connection 00082 conn.queue(job); 00083 } 00084 00085 DatagramConnection& DatagramConvergenceLayer::getConnection(const std::string &identifier) 00086 { 00087 // Test if connection for this address already exist 00088 for(std::list<DatagramConnection*>::iterator i = _connections.begin(); i != _connections.end(); ++i) 00089 { 00090 IBRCOMMON_LOGGER_DEBUG(10) << "Connection identifier: " << (*i)->getIdentifier() << IBRCOMMON_LOGGER_ENDL; 00091 if ((*i)->getIdentifier() == identifier) 00092 return *(*i); 00093 } 00094 00095 // Connection does not exist, create one and put it into the list 00096 DatagramConnection *connection = new DatagramConnection(identifier, _service->getParameter(), (*this)); 00097 00098 _connections.push_back(connection); 00099 _connection_cond.signal(true); 00100 00101 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConvergenceLayer::getConnection "<< connection->getIdentifier() << IBRCOMMON_LOGGER_ENDL; 00102 connection->start(); 00103 return *connection; 00104 } 00105 00106 void DatagramConvergenceLayer::connectionUp(const DatagramConnection *conn) 00107 { 00108 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConvergenceLayer::connectionUp: " << conn->getIdentifier() << IBRCOMMON_LOGGER_ENDL; 00109 } 00110 00111 void DatagramConvergenceLayer::connectionDown(const DatagramConnection *conn) 00112 { 00113 ibrcommon::MutexLock lc(_connection_cond); 00114 00115 std::list<DatagramConnection*>::iterator i; 00116 for(i = _connections.begin(); i != _connections.end(); ++i) 00117 { 00118 if ((*i) == conn) 00119 { 00120 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConvergenceLayer::connectionDown: " << conn->getIdentifier() << IBRCOMMON_LOGGER_ENDL; 00121 00122 _connections.erase(i); 00123 _connection_cond.signal(true); 00124 return; 00125 } 00126 } 00127 00128 IBRCOMMON_LOGGER(error) << "DatagramConvergenceLayer::connectionDown: " << conn->getIdentifier() << " not found!" << IBRCOMMON_LOGGER_ENDL; 00129 } 00130 00131 void DatagramConvergenceLayer::componentUp() 00132 { 00133 bindEvent(dtn::core::TimeEvent::className); 00134 try { 00135 _service->bind(); 00136 } catch (const std::exception &e) { 00137 IBRCOMMON_LOGGER_DEBUG(10) << "Failed to add DatagramConvergenceLayer on " << _service->getInterface().toString() << IBRCOMMON_LOGGER_ENDL; 00138 IBRCOMMON_LOGGER_DEBUG(10) << "Exception: " << e.what() << IBRCOMMON_LOGGER_ENDL; 00139 } 00140 } 00141 00142 void DatagramConvergenceLayer::componentDown() 00143 { 00144 unbindEvent(dtn::core::TimeEvent::className); 00145 00146 // shutdown all connections 00147 { 00148 ibrcommon::MutexLock l(_connection_cond); 00149 for(std::list<DatagramConnection*>::iterator i = _connections.begin(); i != _connections.end(); ++i) 00150 { 00151 (*i)->shutdown(); 00152 } 00153 } 00154 } 00155 00156 void DatagramConvergenceLayer::sendAnnoucement() 00157 { 00158 DiscoveryAnnouncement announcement(DiscoveryAnnouncement::DISCO_VERSION_01, dtn::core::BundleCore::local); 00159 00160 // set sequencenumber 00161 announcement.setSequencenumber(_discovery_sn); 00162 _discovery_sn++; 00163 00164 // clear all services 00165 announcement.clearServices(); 00166 00167 // serialize announcement 00168 stringstream ss; 00169 ss << announcement; 00170 00171 int len = ss.str().size(); 00172 00173 try { 00174 // only on sender at once 00175 ibrcommon::MutexLock l(_send_lock); 00176 00177 // forward the send request to DatagramService 00178 _service->send(HEADER_BROADCAST, 0, 0, ss.str().c_str(), len); 00179 } catch (const DatagramException&) { 00180 // ignore any send failure 00181 }; 00182 } 00183 00184 void DatagramConvergenceLayer::componentRun() 00185 { 00186 _running = true; 00187 00188 size_t maxlen = _service->getParameter().max_msg_length; 00189 std::string address; 00190 unsigned int seqno = 0; 00191 char flags = 0; 00192 char type = 0; 00193 char data[maxlen]; 00194 size_t len = 0; 00195 00196 while (_running) 00197 { 00198 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConvergenceLayer::componentRun early" << IBRCOMMON_LOGGER_ENDL; 00199 00200 try { 00201 // Receive full frame from socket 00202 len = _service->recvfrom(data, maxlen, type, flags, seqno, address); 00203 } catch (const DatagramException&) { 00204 _running = false; 00205 break; 00206 } 00207 00208 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConvergenceLayer::componentRun" << IBRCOMMON_LOGGER_ENDL; 00209 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConvergenceLayer::componentRun: ADDRESS " << address << IBRCOMMON_LOGGER_ENDL; 00210 00211 // Check for extended header and retrieve if available 00212 if (type == HEADER_BROADCAST) 00213 { 00214 IBRCOMMON_LOGGER(notice) << "Received announcement for DatagramConvergenceLayer discovery" << IBRCOMMON_LOGGER_ENDL; 00215 DiscoveryAnnouncement announce; 00216 stringstream ss; 00217 ss.write(data, len); 00218 ss >> announce; 00219 00220 // convert the announcement into NodeEvents 00221 Node n(announce.getEID()); 00222 00223 // timeout value 00224 size_t to_value = 30; 00225 00226 // add 00227 n.add(Node::URI(Node::NODE_DISCOVERED, _service->getProtocol(), address, to_value, 20)); 00228 00229 const std::list<DiscoveryService> services = announce.getServices(); 00230 for (std::list<DiscoveryService>::const_iterator iter = services.begin(); iter != services.end(); iter++) 00231 { 00232 const DiscoveryService &s = (*iter); 00233 n.add(Node::Attribute(Node::NODE_DISCOVERED, s.getName(), s.getParameters(), to_value, 20)); 00234 } 00235 00236 // create and raise a new event 00237 dtn::core::NodeEvent::raise(n, dtn::core::NODE_INFO_UPDATED); 00238 00239 continue; 00240 } 00241 else if ( type == HEADER_SEGMENT ) 00242 { 00243 ibrcommon::MutexLock lc(_connection_cond); 00244 00245 // Connection instance for this address 00246 DatagramConnection& connection = getConnection(address); 00247 00248 try { 00249 // Decide in which queue to write based on the src address 00250 connection.queue(flags, seqno, data, len); 00251 } catch (const ibrcommon::Exception&) { }; 00252 } 00253 else if ( type == HEADER_ACK ) 00254 { 00255 ibrcommon::MutexLock lc(_connection_cond); 00256 00257 // Connection instance for this address 00258 DatagramConnection& connection = getConnection(address); 00259 00260 // Decide in which queue to write based on the src address 00261 connection.ack(seqno); 00262 } 00263 00264 yield(); 00265 } 00266 } 00267 00268 void DatagramConvergenceLayer::raiseEvent(const Event *evt) 00269 { 00270 try { 00271 const TimeEvent &time=dynamic_cast<const TimeEvent&>(*evt); 00272 if (time.getAction() == TIME_SECOND_TICK) 00273 if (time.getTimestamp() % 5 == 0) 00274 sendAnnoucement(); 00275 } catch (const std::bad_cast&) 00276 {} 00277 } 00278 00279 void DatagramConvergenceLayer::__cancellation() 00280 { 00281 _running = false; 00282 _service->shutdown(); 00283 } 00284 00285 const std::string DatagramConvergenceLayer::getName() const 00286 { 00287 return "DatagramConvergenceLayer"; 00288 } 00289 } /* namespace data */ 00290 } /* namespace dtn */