IBR-DTNSuite
0.8
|
00001 #include "net/LOWPANConnection.h" 00002 #include "net/BundleReceivedEvent.h" 00003 #include "core/BundleEvent.h" 00004 #include "net/TransferCompletedEvent.h" 00005 #include "net/TransferAbortedEvent.h" 00006 #include "routing/RequeueBundleEvent.h" 00007 #include "core/BundleCore.h" 00008 00009 #include <ibrcommon/Logger.h> 00010 #include <ibrcommon/thread/MutexLock.h> 00011 00012 #include <ibrdtn/data/ScopeControlHopLimitBlock.h> 00013 #include <ibrdtn/utils/Utils.h> 00014 #include <ibrdtn/data/Serializer.h> 00015 00016 #include <unistd.h> 00017 #include <stdlib.h> 00018 #include <stdio.h> 00019 #include <string.h> 00020 00021 #include <iostream> 00022 #include <list> 00023 00024 using namespace dtn::data; 00025 00026 namespace dtn 00027 { 00028 namespace net 00029 { 00030 LOWPANConnection::LOWPANConnection(unsigned short address, LOWPANConvergenceLayer &cl) 00031 : _address(address), _stream(cl, address), _sender(_stream), _cl(cl) 00032 { 00033 } 00034 00035 LOWPANConnection::~LOWPANConnection() 00036 { 00037 } 00038 00039 ibrcommon::lowpanstream& LOWPANConnection::getStream() 00040 { 00041 return _stream; 00042 } 00043 00044 void LOWPANConnection::setup() 00045 { 00046 _sender.start(); 00047 } 00048 00049 void LOWPANConnection::finally() 00050 { 00051 _sender.stop(); 00052 _sender.join(); 00053 00054 // remove this connection from the connection list 00055 _cl.remove(this); 00056 } 00057 00058 void LOWPANConnection::run() 00059 { 00060 try { 00061 while(_stream.good()) 00062 { 00063 try { 00064 dtn::data::DefaultDeserializer deserializer(_stream); 00065 dtn::data::Bundle bundle; 00066 deserializer >> bundle; 00067 00068 // validate the bundle 00069 dtn::core::BundleCore::getInstance().validate(bundle); 00070 00071 IBRCOMMON_LOGGER_DEBUG(10) << "LOWPANConnection::run"<< IBRCOMMON_LOGGER_ENDL; 00072 00073 // TODO: determine sender 00074 EID sender; 00075 00076 // increment value in the scope control hop limit block 00077 try { 00078 dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<dtn::data::ScopeControlHopLimitBlock>(); 00079 schl.increment(); 00080 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { }; 00081 00082 // raise default bundle received event 00083 dtn::net::BundleReceivedEvent::raise(sender, bundle, false, true); 00084 00085 } catch (const dtn::InvalidDataException &ex) { 00086 IBRCOMMON_LOGGER_DEBUG(10) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00087 } catch (const ibrcommon::IOException &ex) { 00088 IBRCOMMON_LOGGER_DEBUG(10) << "IOException: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00089 } 00090 } 00091 } catch (std::exception &ex) { 00092 IBRCOMMON_LOGGER_DEBUG(10) << "Thread died: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00093 } 00094 } 00095 00096 void LOWPANConnection::__cancellation() 00097 { 00098 _sender.stop(); 00099 } 00100 00101 // class LOWPANConnectionSender 00102 LOWPANConnectionSender::LOWPANConnectionSender(ibrcommon::lowpanstream &stream) 00103 : _stream(stream) 00104 { 00105 } 00106 00107 LOWPANConnectionSender::~LOWPANConnectionSender() 00108 { 00109 } 00110 00111 void LOWPANConnectionSender::queue(const ConvergenceLayer::Job &job) 00112 { 00113 IBRCOMMON_LOGGER_DEBUG(10) << ":LOWPANConnectionSender::queue"<< IBRCOMMON_LOGGER_ENDL; 00114 _queue.push(job); 00115 } 00116 00117 void LOWPANConnectionSender::run() 00118 { 00119 try { 00120 while(_stream.good()) 00121 { 00122 ConvergenceLayer::Job job = _queue.getnpop(true); 00123 dtn::data::DefaultSerializer serializer(_stream); 00124 00125 IBRCOMMON_LOGGER_DEBUG(10) << ":LOWPANConnectionSender::run"<< IBRCOMMON_LOGGER_ENDL; 00126 00127 dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00128 00129 // read the bundle out of the storage 00130 const dtn::data::Bundle bundle = storage.get(job._bundle); 00131 00132 // Put bundle into stringstream 00133 serializer << bundle; _stream.flush(); 00134 // raise bundle event 00135 dtn::net::TransferCompletedEvent::raise(job._destination, bundle); 00136 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED); 00137 } 00138 // FIXME: Exit strategy when sending on socket failed. Like destroying the connection object 00139 // Also check what needs to be done when the node is not reachable (transfer requeue...) 00140 00141 IBRCOMMON_LOGGER_DEBUG(10) << ":LOWPANConnectionSender::run stream destroyed"<< IBRCOMMON_LOGGER_ENDL; 00142 } catch (std::exception &ex) { 00143 IBRCOMMON_LOGGER_DEBUG(10) << "Thread died: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00144 } 00145 } 00146 00147 void LOWPANConnectionSender::__cancellation() 00148 { 00149 _queue.abort(); 00150 } 00151 } 00152 }