IBR-DTNSuite
0.8
|
00001 #include "net/UDPConvergenceLayer.h" 00002 #include "net/BundleReceivedEvent.h" 00003 #include "net/TransferCompletedEvent.h" 00004 #include "net/TransferAbortedEvent.h" 00005 #include "core/BundleEvent.h" 00006 #include "core/BundleCore.h" 00007 #include "routing/RequeueBundleEvent.h" 00008 #include "Configuration.h" 00009 00010 #include <ibrdtn/utils/Utils.h> 00011 #include <ibrdtn/data/Serializer.h> 00012 #include <ibrdtn/data/ScopeControlHopLimitBlock.h> 00013 00014 #include <ibrcommon/net/UnicastSocket.h> 00015 #include <ibrcommon/net/vaddress.h> 00016 #include <ibrcommon/net/vinterface.h> 00017 #include <ibrcommon/data/BLOB.h> 00018 #include <ibrcommon/Logger.h> 00019 #include <ibrcommon/thread/MutexLock.h> 00020 00021 #include <sys/socket.h> 00022 #include <poll.h> 00023 #include <errno.h> 00024 00025 #include <sys/types.h> 00026 #include <netinet/in.h> 00027 #include <arpa/inet.h> 00028 #include <unistd.h> 00029 #include <stdlib.h> 00030 #include <stdio.h> 00031 #include <string.h> 00032 #include <fcntl.h> 00033 #include <limits.h> 00034 00035 #include <iostream> 00036 #include <list> 00037 00038 00039 using namespace dtn::data; 00040 00041 namespace dtn 00042 { 00043 namespace net 00044 { 00045 const int UDPConvergenceLayer::DEFAULT_PORT = 4556; 00046 00047 UDPConvergenceLayer::UDPConvergenceLayer(ibrcommon::vinterface net, int port, unsigned int mtu) 00048 : _socket(NULL), _net(net), _port(port), m_maxmsgsize(mtu), _running(false) 00049 { 00050 _socket = new ibrcommon::UnicastSocket(); 00051 } 00052 00053 UDPConvergenceLayer::~UDPConvergenceLayer() 00054 { 00055 componentDown(); 00056 delete _socket; 00057 } 00058 00059 dtn::core::Node::Protocol UDPConvergenceLayer::getDiscoveryProtocol() const 00060 { 00061 return dtn::core::Node::CONN_UDPIP; 00062 } 00063 00064 void UDPConvergenceLayer::update(const ibrcommon::vinterface &iface, std::string &name, std::string ¶ms) throw (dtn::net::DiscoveryServiceProvider::NoServiceHereException) 00065 { 00066 if (iface == _net) 00067 { 00068 name = "udpcl"; 00069 stringstream service; 00070 00071 try { 00072 std::list<ibrcommon::vaddress> list = _net.getAddresses(ibrcommon::vaddress::VADDRESS_INET); 00073 if (!list.empty()) 00074 { 00075 service << "ip=" << list.front().get(false) << ";port=" << _port << ";"; 00076 } 00077 else 00078 { 00079 service << "port=" << _port << ";"; 00080 } 00081 } catch (const ibrcommon::vinterface::interface_not_set&) { 00082 service << "port=" << _port << ";"; 00083 }; 00084 00085 params = service.str(); 00086 } 00087 else 00088 { 00089 throw dtn::net::DiscoveryServiceProvider::NoServiceHereException(); 00090 } 00091 } 00092 00093 void UDPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job) 00094 { 00095 const std::list<dtn::core::Node::URI> uri_list = node.get(dtn::core::Node::CONN_UDPIP); 00096 if (uri_list.empty()) 00097 { 00098 dtn::net::TransferAbortedEvent::raise(node.getEID(), job._bundle, dtn::net::TransferAbortedEvent::REASON_UNDEFINED); 00099 return; 00100 } 00101 00102 dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00103 00104 const dtn::core::Node::URI &uri = uri_list.front(); 00105 00106 std::string address = "0.0.0.0"; 00107 unsigned int port = 0; 00108 00109 // read values 00110 uri.decode(address, port); 00111 00112 // get the address of the node 00113 ibrcommon::vaddress addr(address); 00114 00115 try { 00116 // read the bundle out of the storage 00117 const dtn::data::Bundle bundle = storage.get(job._bundle); 00118 00119 // create a dummy serializer 00120 dtn::data::DefaultSerializer dummy(std::cout); 00121 00122 size_t header = dummy.getLength((const PrimaryBlock&)bundle); 00123 header += 20; // two times SDNV through fragmentation 00124 00125 unsigned int size = dummy.getLength(bundle); 00126 00127 if (size > m_maxmsgsize) 00128 { 00129 // abort transmission if fragmentation is disabled 00130 if (!dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation() 00131 && !bundle.get(dtn::data::PrimaryBlock::DONT_FRAGMENT)) 00132 { 00133 throw ConnectionInterruptedException(); 00134 } 00135 00136 const size_t psize = bundle.getBlock<dtn::data::PayloadBlock>().getLength(); 00137 const size_t fragment_size = m_maxmsgsize - header; 00138 const size_t fragment_count = (psize / fragment_size) + (((psize % fragment_size) > 0) ? 1 : 0); 00139 00140 IBRCOMMON_LOGGER_DEBUG(15) << "MTU of " << m_maxmsgsize << " is too small to carry " << psize << " bytes of payload." << IBRCOMMON_LOGGER_ENDL; 00141 IBRCOMMON_LOGGER_DEBUG(15) << "create " << fragment_count << " fragments with " << fragment_size << " bytes each." << IBRCOMMON_LOGGER_ENDL; 00142 00143 for (size_t i = 0; i < fragment_count; i++) 00144 { 00145 dtn::data::BundleFragment fragment(bundle, i * fragment_size, fragment_size); 00146 00147 std::stringstream ss; 00148 dtn::data::DefaultSerializer serializer(ss); 00149 00150 serializer << fragment; 00151 string data = ss.str(); 00152 00153 // set write lock 00154 ibrcommon::MutexLock l(m_writelock); 00155 00156 // send converted line back to client. 00157 int ret = _socket->send(addr, port, data.c_str(), data.length()); 00158 00159 if (ret == -1) 00160 { 00161 // CL is busy, requeue bundle 00162 dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle); 00163 00164 return; 00165 } 00166 } 00167 } 00168 else 00169 { 00170 std::stringstream ss; 00171 dtn::data::DefaultSerializer serializer(ss); 00172 00173 serializer << bundle; 00174 string data = ss.str(); 00175 00176 // set write lock 00177 ibrcommon::MutexLock l(m_writelock); 00178 00179 // send converted line back to client. 00180 int ret = _socket->send(addr, port, data.c_str(), data.length()); 00181 00182 if (ret == -1) 00183 { 00184 // CL is busy, requeue bundle 00185 dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle); 00186 00187 return; 00188 } 00189 } 00190 00191 // raise bundle event 00192 dtn::net::TransferCompletedEvent::raise(job._destination, bundle); 00193 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED); 00194 } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { 00195 // send transfer aborted event 00196 dtn::net::TransferAbortedEvent::raise(node.getEID(), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED); 00197 } 00198 00199 } 00200 00201 UDPConvergenceLayer& UDPConvergenceLayer::operator>>(dtn::data::Bundle &bundle) 00202 { 00203 ibrcommon::MutexLock l(m_readlock); 00204 00205 char data[m_maxmsgsize]; 00206 00207 // data waiting 00208 int len = _socket->receive(data, m_maxmsgsize); 00209 00210 if (len > 0) 00211 { 00212 // read all data into a stream 00213 stringstream ss; 00214 ss.write(data, len); 00215 00216 // get the bundle 00217 dtn::data::DefaultDeserializer(ss, dtn::core::BundleCore::getInstance()) >> bundle; 00218 } 00219 00220 return (*this); 00221 } 00222 00223 void UDPConvergenceLayer::componentUp() 00224 { 00225 try { 00226 try { 00227 ibrcommon::UnicastSocket &sock = dynamic_cast<ibrcommon::UnicastSocket&>(*_socket); 00228 sock.bind(_port, _net); 00229 } catch (const std::bad_cast&) { 00230 00231 } 00232 } catch (const ibrcommon::udpsocket::SocketException &ex) { 00233 IBRCOMMON_LOGGER(error) << "Failed to add UDP ConvergenceLayer on " << _net.toString() << ":" << _port << IBRCOMMON_LOGGER_ENDL; 00234 IBRCOMMON_LOGGER(error) << " Error: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00235 } 00236 } 00237 00238 void UDPConvergenceLayer::componentDown() 00239 { 00240 stop(); 00241 join(); 00242 } 00243 00244 void UDPConvergenceLayer::componentRun() 00245 { 00246 _running = true; 00247 00248 while (_running) 00249 { 00250 try { 00251 dtn::data::Bundle bundle; 00252 (*this) >> bundle; 00253 00254 // TODO: determine sender 00255 EID sender; 00256 00257 // increment value in the scope control hop limit block 00258 try { 00259 dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<dtn::data::ScopeControlHopLimitBlock>(); 00260 schl.increment(); 00261 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { }; 00262 00263 // raise default bundle received event 00264 dtn::net::BundleReceivedEvent::raise(sender, bundle, false, true); 00265 00266 } catch (const dtn::InvalidDataException &ex) { 00267 IBRCOMMON_LOGGER(warning) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00268 } catch (const ibrcommon::IOException &ex) { 00269 00270 } 00271 yield(); 00272 } 00273 } 00274 00275 void UDPConvergenceLayer::__cancellation() 00276 { 00277 _running = false; 00278 _socket->shutdown(); 00279 } 00280 00281 const std::string UDPConvergenceLayer::getName() const 00282 { 00283 return "UDPConvergenceLayer"; 00284 } 00285 } 00286 }