IBR-DTNSuite  0.8
daemon/src/net/UDPConvergenceLayer.cpp
Go to the documentation of this file.
00001 #include "net/UDPConvergenceLayer.h"
00002 #include "net/BundleReceivedEvent.h"
00003 #include "net/TransferCompletedEvent.h"
00004 #include "net/TransferAbortedEvent.h"
00005 #include "core/BundleEvent.h"
00006 #include "core/BundleCore.h"
00007 #include "routing/RequeueBundleEvent.h"
00008 #include "Configuration.h"
00009 
00010 #include <ibrdtn/utils/Utils.h>
00011 #include <ibrdtn/data/Serializer.h>
00012 #include <ibrdtn/data/ScopeControlHopLimitBlock.h>
00013 
00014 #include <ibrcommon/net/UnicastSocket.h>
00015 #include <ibrcommon/net/vaddress.h>
00016 #include <ibrcommon/net/vinterface.h>
00017 #include <ibrcommon/data/BLOB.h>
00018 #include <ibrcommon/Logger.h>
00019 #include <ibrcommon/thread/MutexLock.h>
00020 
00021 #include <sys/socket.h>
00022 #include <poll.h>
00023 #include <errno.h>
00024 
00025 #include <sys/types.h>
00026 #include <netinet/in.h>
00027 #include <arpa/inet.h>
00028 #include <unistd.h>
00029 #include <stdlib.h>
00030 #include <stdio.h>
00031 #include <string.h>
00032 #include <fcntl.h>
00033 #include <limits.h>
00034 
00035 #include <iostream>
00036 #include <list>
00037 
00038 
00039 using namespace dtn::data;
00040 
00041 namespace dtn
00042 {
00043         namespace net
00044         {
00045                 const int UDPConvergenceLayer::DEFAULT_PORT = 4556;
00046 
00047                 UDPConvergenceLayer::UDPConvergenceLayer(ibrcommon::vinterface net, int port, unsigned int mtu)
00048                         : _socket(NULL), _net(net), _port(port), m_maxmsgsize(mtu), _running(false)
00049                 {
00050                         _socket = new ibrcommon::UnicastSocket();
00051                 }
00052 
00053                 UDPConvergenceLayer::~UDPConvergenceLayer()
00054                 {
00055                         componentDown();
00056                         delete _socket;
00057                 }
00058 
00059                 dtn::core::Node::Protocol UDPConvergenceLayer::getDiscoveryProtocol() const
00060                 {
00061                         return dtn::core::Node::CONN_UDPIP;
00062                 }
00063 
00064                 void UDPConvergenceLayer::update(const ibrcommon::vinterface &iface, std::string &name, std::string &params) throw (dtn::net::DiscoveryServiceProvider::NoServiceHereException)
00065                 {
00066                         if (iface == _net)
00067                         {
00068                                 name = "udpcl";
00069                                 stringstream service;
00070 
00071                                 try {
00072                                         std::list<ibrcommon::vaddress> list = _net.getAddresses(ibrcommon::vaddress::VADDRESS_INET);
00073                                         if (!list.empty())
00074                                         {
00075                                                  service << "ip=" << list.front().get(false) << ";port=" << _port << ";";
00076                                         }
00077                                         else
00078                                         {
00079                                                 service << "port=" << _port << ";";
00080                                         }
00081                                 } catch (const ibrcommon::vinterface::interface_not_set&) {
00082                                         service << "port=" << _port << ";";
00083                                 };
00084 
00085                                 params = service.str();
00086                         }
00087                         else
00088                         {
00089                                  throw dtn::net::DiscoveryServiceProvider::NoServiceHereException();
00090                         }
00091                 }
00092 
00093                 void UDPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job)
00094                 {
00095                         const std::list<dtn::core::Node::URI> uri_list = node.get(dtn::core::Node::CONN_UDPIP);
00096                         if (uri_list.empty())
00097                         {
00098                                 dtn::net::TransferAbortedEvent::raise(node.getEID(), job._bundle, dtn::net::TransferAbortedEvent::REASON_UNDEFINED);
00099                                 return;
00100                         }
00101 
00102                         dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00103 
00104                         const dtn::core::Node::URI &uri = uri_list.front();
00105 
00106                         std::string address = "0.0.0.0";
00107                         unsigned int port = 0;
00108 
00109                         // read values
00110                         uri.decode(address, port);
00111 
00112                         // get the address of the node
00113                         ibrcommon::vaddress addr(address);
00114 
00115                         try {
00116                                 // read the bundle out of the storage
00117                                 const dtn::data::Bundle bundle = storage.get(job._bundle);
00118 
00119                                 // create a dummy serializer
00120                                 dtn::data::DefaultSerializer dummy(std::cout);
00121 
00122                                 size_t header = dummy.getLength((const PrimaryBlock&)bundle);
00123                                 header += 20; // two times SDNV through fragmentation
00124 
00125                                 unsigned int size = dummy.getLength(bundle);
00126 
00127                                 if (size > m_maxmsgsize)
00128                                 {
00129                                         // abort transmission if fragmentation is disabled
00130                                         if (!dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation()
00131                                                         && !bundle.get(dtn::data::PrimaryBlock::DONT_FRAGMENT))
00132                                         {
00133                                                 throw ConnectionInterruptedException();
00134                                         }
00135 
00136                                         const size_t psize = bundle.getBlock<dtn::data::PayloadBlock>().getLength();
00137                                         const size_t fragment_size = m_maxmsgsize - header;
00138                                         const size_t fragment_count = (psize / fragment_size) + (((psize % fragment_size) > 0) ? 1 : 0);
00139 
00140                                         IBRCOMMON_LOGGER_DEBUG(15) << "MTU of " << m_maxmsgsize << " is too small to carry " << psize << " bytes of payload." << IBRCOMMON_LOGGER_ENDL;
00141                                         IBRCOMMON_LOGGER_DEBUG(15) << "create " << fragment_count << " fragments with " << fragment_size << " bytes each." << IBRCOMMON_LOGGER_ENDL;
00142 
00143                                         for (size_t i = 0; i < fragment_count; i++)
00144                                         {
00145                                                 dtn::data::BundleFragment fragment(bundle, i * fragment_size, fragment_size);
00146 
00147                                                 std::stringstream ss;
00148                                                 dtn::data::DefaultSerializer serializer(ss);
00149 
00150                                                 serializer << fragment;
00151                                                 string data = ss.str();
00152 
00153                                                 // set write lock
00154                                                 ibrcommon::MutexLock l(m_writelock);
00155 
00156                                                 // send converted line back to client.
00157                                                 int ret = _socket->send(addr, port, data.c_str(), data.length());
00158 
00159                                                 if (ret == -1)
00160                                                 {
00161                                                         // CL is busy, requeue bundle
00162                                                         dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle);
00163 
00164                                                         return;
00165                                                 }
00166                                         }
00167                                 }
00168                                 else
00169                                 {
00170                                         std::stringstream ss;
00171                                         dtn::data::DefaultSerializer serializer(ss);
00172 
00173                                         serializer << bundle;
00174                                         string data = ss.str();
00175 
00176                                         // set write lock
00177                                         ibrcommon::MutexLock l(m_writelock);
00178 
00179                                         // send converted line back to client.
00180                                         int ret = _socket->send(addr, port, data.c_str(), data.length());
00181 
00182                                         if (ret == -1)
00183                                         {
00184                                                 // CL is busy, requeue bundle
00185                                                 dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle);
00186 
00187                                                 return;
00188                                         }
00189                                 }
00190 
00191                                 // raise bundle event
00192                                 dtn::net::TransferCompletedEvent::raise(job._destination, bundle);
00193                                 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED);
00194                         } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) {
00195                                 // send transfer aborted event
00196                                 dtn::net::TransferAbortedEvent::raise(node.getEID(), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00197                         }
00198 
00199                 }
00200 
00201                 UDPConvergenceLayer& UDPConvergenceLayer::operator>>(dtn::data::Bundle &bundle)
00202                 {
00203                         ibrcommon::MutexLock l(m_readlock);
00204 
00205                         char data[m_maxmsgsize];
00206 
00207                         // data waiting
00208                         int len = _socket->receive(data, m_maxmsgsize);
00209 
00210                         if (len > 0)
00211                         {
00212                                 // read all data into a stream
00213                                 stringstream ss;
00214                                 ss.write(data, len);
00215 
00216                                 // get the bundle
00217                                 dtn::data::DefaultDeserializer(ss, dtn::core::BundleCore::getInstance()) >> bundle;
00218                         }
00219 
00220                         return (*this);
00221                 }
00222 
00223                 void UDPConvergenceLayer::componentUp()
00224                 {
00225                         try {
00226                                 try {
00227                                         ibrcommon::UnicastSocket &sock = dynamic_cast<ibrcommon::UnicastSocket&>(*_socket);
00228                                         sock.bind(_port, _net);
00229                                 } catch (const std::bad_cast&) {
00230 
00231                                 }
00232                         } catch (const ibrcommon::udpsocket::SocketException &ex) {
00233                                 IBRCOMMON_LOGGER(error) << "Failed to add UDP ConvergenceLayer on " << _net.toString() << ":" << _port << IBRCOMMON_LOGGER_ENDL;
00234                                 IBRCOMMON_LOGGER(error) << "      Error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00235                         }
00236                 }
00237 
00238                 void UDPConvergenceLayer::componentDown()
00239                 {
00240                         stop();
00241                         join();
00242                 }
00243 
00244                 void UDPConvergenceLayer::componentRun()
00245                 {
00246                         _running = true;
00247 
00248                         while (_running)
00249                         {
00250                                 try {
00251                                         dtn::data::Bundle bundle;
00252                                         (*this) >> bundle;
00253 
00254                                         // TODO: determine sender
00255                                         EID sender;
00256 
00257                                         // increment value in the scope control hop limit block
00258                                         try {
00259                                                 dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<dtn::data::ScopeControlHopLimitBlock>();
00260                                                 schl.increment();
00261                                         } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { };
00262 
00263                                         // raise default bundle received event
00264                                         dtn::net::BundleReceivedEvent::raise(sender, bundle, false, true);
00265 
00266                                 } catch (const dtn::InvalidDataException &ex) {
00267                                         IBRCOMMON_LOGGER(warning) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00268                                 } catch (const ibrcommon::IOException &ex) {
00269 
00270                                 }
00271                                 yield();
00272                         }
00273                 }
00274 
00275                 void UDPConvergenceLayer::__cancellation()
00276                 {
00277                         _running = false;
00278                         _socket->shutdown();
00279                 }
00280 
00281                 const std::string UDPConvergenceLayer::getName() const
00282                 {
00283                         return "UDPConvergenceLayer";
00284                 }
00285         }
00286 }