IBR-DTNSuite  0.8
daemon/src/net/DatagramConvergenceLayer.cpp
Go to the documentation of this file.
00001 /*
00002  * DatagramConvergenceLayer.cpp
00003  *
00004  *  Created on: 21.11.2011
00005  *      Author: morgenro
00006  */
00007 
00008 #include "net/DatagramConvergenceLayer.h"
00009 #include "net/DatagramConnection.h"
00010 
00011 #include "core/BundleCore.h"
00012 #include "core/TimeEvent.h"
00013 #include "core/NodeEvent.h"
00014 
00015 #include <ibrcommon/Logger.h>
00016 #include <ibrcommon/thread/MutexLock.h>
00017 
00018 #include <string.h>
00019 
00020 namespace dtn
00021 {
00022         namespace net
00023         {
00024                 DatagramConvergenceLayer::DatagramConvergenceLayer(DatagramService *ds)
00025                  : _service(ds), _discovery_sn(0)
00026                 {
00027                 }
00028 
00029                 DatagramConvergenceLayer::~DatagramConvergenceLayer()
00030                 {
00031                         // wait until all connections are down
00032                         {
00033                                 ibrcommon::MutexLock l(_connection_cond);
00034                                 while (_connections.size() != 0) _connection_cond.wait();
00035                         }
00036 
00037                         join();
00038                         delete _service;
00039                 }
00040 
00041                 dtn::core::Node::Protocol DatagramConvergenceLayer::getDiscoveryProtocol() const
00042                 {
00043                         return _service->getProtocol();
00044                 }
00045 
00046                 void DatagramConvergenceLayer::callback_send(DatagramConnection&, const char &flags, const unsigned int &seqno, const std::string &destination, const char *buf, int len) throw (DatagramException)
00047                 {
00048                         // only on sender at once
00049                         ibrcommon::MutexLock l(_send_lock);
00050 
00051                         // forward the send request to DatagramService
00052                         _service->send(HEADER_SEGMENT, flags, seqno, destination, buf, len);
00053                 }
00054 
00055                 void DatagramConvergenceLayer::callback_ack(DatagramConnection&, const unsigned int &seqno, const std::string &destination) throw (DatagramException)
00056                 {
00057                         // only on sender at once
00058                         ibrcommon::MutexLock l(_send_lock);
00059 
00060                         // forward the send request to DatagramService
00061                         _service->send(HEADER_ACK, 0, seqno, destination, NULL, 0);
00062                 }
00063 
00064                 void DatagramConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job)
00065                 {
00066                         const std::list<dtn::core::Node::URI> uri_list = node.get(_service->getProtocol());
00067                         if (uri_list.empty()) return;
00068 
00069                         // get the first element of the result
00070                         const dtn::core::Node::URI &uri = uri_list.front();
00071 
00072                         // some debugging
00073                         IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConvergenceLayer::queue"<< IBRCOMMON_LOGGER_ENDL;
00074 
00075                         // lock the connection list while working with it
00076                         ibrcommon::MutexLock lc(_connection_cond);
00077 
00078                         // get a new or the existing connection for this address
00079                         DatagramConnection &conn = getConnection( uri.value );
00080 
00081                         // queue the job to the connection
00082                         conn.queue(job);
00083                 }
00084 
00085                 DatagramConnection& DatagramConvergenceLayer::getConnection(const std::string &identifier)
00086                 {
00087                         // Test if connection for this address already exist
00088                         for(std::list<DatagramConnection*>::iterator i = _connections.begin(); i != _connections.end(); ++i)
00089                         {
00090                                 IBRCOMMON_LOGGER_DEBUG(10) << "Connection identifier: " << (*i)->getIdentifier() << IBRCOMMON_LOGGER_ENDL;
00091                                 if ((*i)->getIdentifier() == identifier)
00092                                         return *(*i);
00093                         }
00094 
00095                         // Connection does not exist, create one and put it into the list
00096                         DatagramConnection *connection = new DatagramConnection(identifier, _service->getParameter(), (*this));
00097 
00098                         _connections.push_back(connection);
00099                         _connection_cond.signal(true);
00100 
00101                         IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConvergenceLayer::getConnection "<< connection->getIdentifier() << IBRCOMMON_LOGGER_ENDL;
00102                         connection->start();
00103                         return *connection;
00104                 }
00105 
00106                 void DatagramConvergenceLayer::connectionUp(const DatagramConnection *conn)
00107                 {
00108                         IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConvergenceLayer::connectionUp: " << conn->getIdentifier() << IBRCOMMON_LOGGER_ENDL;
00109                 }
00110 
00111                 void DatagramConvergenceLayer::connectionDown(const DatagramConnection *conn)
00112                 {
00113                         ibrcommon::MutexLock lc(_connection_cond);
00114 
00115                         std::list<DatagramConnection*>::iterator i;
00116                         for(i = _connections.begin(); i != _connections.end(); ++i)
00117                         {
00118                                 if ((*i) == conn)
00119                                 {
00120                                         IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConvergenceLayer::connectionDown: " << conn->getIdentifier() << IBRCOMMON_LOGGER_ENDL;
00121 
00122                                         _connections.erase(i);
00123                                         _connection_cond.signal(true);
00124                                         return;
00125                                 }
00126                         }
00127 
00128                         IBRCOMMON_LOGGER(error) << "DatagramConvergenceLayer::connectionDown: " << conn->getIdentifier() << " not found!" << IBRCOMMON_LOGGER_ENDL;
00129                 }
00130 
00131                 void DatagramConvergenceLayer::componentUp()
00132                 {
00133                         bindEvent(dtn::core::TimeEvent::className);
00134                         try {
00135                                 _service->bind();
00136                         } catch (const std::exception &e) {
00137                                 IBRCOMMON_LOGGER_DEBUG(10) << "Failed to add DatagramConvergenceLayer on " << _service->getInterface().toString() << IBRCOMMON_LOGGER_ENDL;
00138                                 IBRCOMMON_LOGGER_DEBUG(10) << "Exception: " << e.what() << IBRCOMMON_LOGGER_ENDL;
00139                         }
00140                 }
00141 
00142                 void DatagramConvergenceLayer::componentDown()
00143                 {
00144                         unbindEvent(dtn::core::TimeEvent::className);
00145 
00146                         // shutdown all connections
00147                         {
00148                                 ibrcommon::MutexLock l(_connection_cond);
00149                                 for(std::list<DatagramConnection*>::iterator i = _connections.begin(); i != _connections.end(); ++i)
00150                                 {
00151                                         (*i)->shutdown();
00152                                 }
00153                         }
00154                 }
00155 
00156                 void DatagramConvergenceLayer::sendAnnoucement()
00157                 {
00158                         DiscoveryAnnouncement announcement(DiscoveryAnnouncement::DISCO_VERSION_01, dtn::core::BundleCore::local);
00159 
00160                         // set sequencenumber
00161                         announcement.setSequencenumber(_discovery_sn);
00162                         _discovery_sn++;
00163 
00164                         // clear all services
00165                         announcement.clearServices();
00166 
00167                         // serialize announcement
00168                         stringstream ss;
00169                         ss << announcement;
00170 
00171                         int len = ss.str().size();
00172 
00173                         try {
00174                                 // only on sender at once
00175                                 ibrcommon::MutexLock l(_send_lock);
00176 
00177                                 // forward the send request to DatagramService
00178                                 _service->send(HEADER_BROADCAST, 0, 0, ss.str().c_str(), len);
00179                         } catch (const DatagramException&) {
00180                                 // ignore any send failure
00181                         };
00182                 }
00183 
00184                 void DatagramConvergenceLayer::componentRun()
00185                 {
00186                         _running = true;
00187 
00188                         size_t maxlen = _service->getParameter().max_msg_length;
00189                         std::string address;
00190                         unsigned int seqno = 0;
00191                         char flags = 0;
00192                         char type = 0;
00193                         char data[maxlen];
00194                         size_t len = 0;
00195 
00196                         while (_running)
00197                         {
00198                                 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConvergenceLayer::componentRun early" << IBRCOMMON_LOGGER_ENDL;
00199 
00200                                 try {
00201                                         // Receive full frame from socket
00202                                         len = _service->recvfrom(data, maxlen, type, flags, seqno, address);
00203                                 } catch (const DatagramException&) {
00204                                         _running = false;
00205                                         break;
00206                                 }
00207 
00208                                 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConvergenceLayer::componentRun" << IBRCOMMON_LOGGER_ENDL;
00209                                 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConvergenceLayer::componentRun: ADDRESS " << address << IBRCOMMON_LOGGER_ENDL;
00210 
00211                                 // Check for extended header and retrieve if available
00212                                 if (type == HEADER_BROADCAST)
00213                                 {
00214                                         IBRCOMMON_LOGGER(notice) << "Received announcement for DatagramConvergenceLayer discovery" << IBRCOMMON_LOGGER_ENDL;
00215                                         DiscoveryAnnouncement announce;
00216                                         stringstream ss;
00217                                         ss.write(data, len);
00218                                         ss >> announce;
00219 
00220                                         // convert the announcement into NodeEvents
00221                                         Node n(announce.getEID());
00222 
00223                                         // timeout value
00224                                         size_t to_value = 30;
00225 
00226                                         // add
00227                                         n.add(Node::URI(Node::NODE_DISCOVERED, _service->getProtocol(), address, to_value, 20));
00228 
00229                                         const std::list<DiscoveryService> services = announce.getServices();
00230                                         for (std::list<DiscoveryService>::const_iterator iter = services.begin(); iter != services.end(); iter++)
00231                                         {
00232                                                 const DiscoveryService &s = (*iter);
00233                                                 n.add(Node::Attribute(Node::NODE_DISCOVERED, s.getName(), s.getParameters(), to_value, 20));
00234                                         }
00235 
00236                                         // create and raise a new event
00237                                         dtn::core::NodeEvent::raise(n, dtn::core::NODE_INFO_UPDATED);
00238 
00239                                         continue;
00240                                 }
00241                                 else if ( type == HEADER_SEGMENT )
00242                                 {
00243                                         ibrcommon::MutexLock lc(_connection_cond);
00244 
00245                                         // Connection instance for this address
00246                                         DatagramConnection& connection = getConnection(address);
00247 
00248                                         try {
00249                                                 // Decide in which queue to write based on the src address
00250                                                 connection.queue(flags, seqno, data, len);
00251                                         } catch (const ibrcommon::Exception&) { };
00252                                 }
00253                                 else if ( type == HEADER_ACK )
00254                                 {
00255                                         ibrcommon::MutexLock lc(_connection_cond);
00256 
00257                                         // Connection instance for this address
00258                                         DatagramConnection& connection = getConnection(address);
00259 
00260                                         // Decide in which queue to write based on the src address
00261                                         connection.ack(seqno);
00262                                 }
00263 
00264                                 yield();
00265                         }
00266                 }
00267 
00268                 void DatagramConvergenceLayer::raiseEvent(const Event *evt)
00269                 {
00270                         try {
00271                                 const TimeEvent &time=dynamic_cast<const TimeEvent&>(*evt);
00272                                 if (time.getAction() == TIME_SECOND_TICK)
00273                                         if (time.getTimestamp() % 5 == 0)
00274                                                 sendAnnoucement();
00275                         } catch (const std::bad_cast&)
00276                         {}
00277                 }
00278 
00279                 void DatagramConvergenceLayer::__cancellation()
00280                 {
00281                         _running = false;
00282                         _service->shutdown();
00283                 }
00284 
00285                 const std::string DatagramConvergenceLayer::getName() const
00286                 {
00287                         return "DatagramConvergenceLayer";
00288                 }
00289         } /* namespace data */
00290 } /* namespace dtn */