IBR-DTNSuite  0.8
daemon/src/net/LOWPANConnection.cpp
Go to the documentation of this file.
00001 #include "net/LOWPANConnection.h"
00002 #include "net/BundleReceivedEvent.h"
00003 #include "core/BundleEvent.h"
00004 #include "net/TransferCompletedEvent.h"
00005 #include "net/TransferAbortedEvent.h"
00006 #include "routing/RequeueBundleEvent.h"
00007 #include "core/BundleCore.h"
00008 
00009 #include <ibrcommon/Logger.h>
00010 #include <ibrcommon/thread/MutexLock.h>
00011 
00012 #include <ibrdtn/data/ScopeControlHopLimitBlock.h>
00013 #include <ibrdtn/utils/Utils.h>
00014 #include <ibrdtn/data/Serializer.h>
00015 
00016 #include <unistd.h>
00017 #include <stdlib.h>
00018 #include <stdio.h>
00019 #include <string.h>
00020 
00021 #include <iostream>
00022 #include <list>
00023 
00024 using namespace dtn::data;
00025 
00026 namespace dtn
00027 {
00028         namespace net
00029         {
00030                 LOWPANConnection::LOWPANConnection(unsigned short address, LOWPANConvergenceLayer &cl)
00031                         : _address(address), _stream(cl, address), _sender(_stream), _cl(cl)
00032                 {
00033                 }
00034 
00035                 LOWPANConnection::~LOWPANConnection()
00036                 {
00037                 }
00038 
00039                 ibrcommon::lowpanstream& LOWPANConnection::getStream()
00040                 {
00041                         return _stream;
00042                 }
00043 
00044                 void LOWPANConnection::setup()
00045                 {
00046                         _sender.start();
00047                 }
00048 
00049                 void LOWPANConnection::finally()
00050                 {
00051                         _sender.stop();
00052                         _sender.join();
00053 
00054                         // remove this connection from the connection list
00055                         _cl.remove(this);
00056                 }
00057 
00058                 void LOWPANConnection::run()
00059                 {
00060                         try {
00061                                 while(_stream.good())
00062                                 {
00063                                         try {
00064                                                 dtn::data::DefaultDeserializer deserializer(_stream);
00065                                                 dtn::data::Bundle bundle;
00066                                                 deserializer >> bundle;
00067                                                 
00068                                                 // validate the bundle
00069                                                 dtn::core::BundleCore::getInstance().validate(bundle);
00070 
00071                                                 IBRCOMMON_LOGGER_DEBUG(10) << "LOWPANConnection::run"<< IBRCOMMON_LOGGER_ENDL;
00072 
00073                                                 // TODO: determine sender
00074                                                 EID sender;
00075 
00076                                                 // increment value in the scope control hop limit block
00077                                                 try {
00078                                                         dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<dtn::data::ScopeControlHopLimitBlock>();
00079                                                         schl.increment();
00080                                                 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { };
00081 
00082                                                 // raise default bundle received event
00083                                                 dtn::net::BundleReceivedEvent::raise(sender, bundle, false, true);
00084 
00085                                         } catch (const dtn::InvalidDataException &ex) {
00086                                                 IBRCOMMON_LOGGER_DEBUG(10) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00087                                         } catch (const ibrcommon::IOException &ex) {
00088                                                 IBRCOMMON_LOGGER_DEBUG(10) << "IOException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00089                                         }
00090                                 }
00091                         } catch (std::exception &ex) {
00092                                 IBRCOMMON_LOGGER_DEBUG(10) << "Thread died: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00093                         }
00094                 }
00095 
00096                 void LOWPANConnection::__cancellation()
00097                 {
00098                         _sender.stop();
00099                 }
00100 
00101                 // class LOWPANConnectionSender
00102                 LOWPANConnectionSender::LOWPANConnectionSender(ibrcommon::lowpanstream &stream)
00103                 : _stream(stream)
00104                 {
00105                 }
00106 
00107                 LOWPANConnectionSender::~LOWPANConnectionSender()
00108                 {
00109                 }
00110 
00111                 void LOWPANConnectionSender::queue(const ConvergenceLayer::Job &job)
00112                 {
00113                         IBRCOMMON_LOGGER_DEBUG(10) << ":LOWPANConnectionSender::queue"<< IBRCOMMON_LOGGER_ENDL;
00114                         _queue.push(job);
00115                 }
00116 
00117                 void LOWPANConnectionSender::run()
00118                 {
00119                         try {
00120                                 while(_stream.good())
00121                                 {
00122                                         ConvergenceLayer::Job job = _queue.getnpop(true);
00123                                         dtn::data::DefaultSerializer serializer(_stream);
00124 
00125                                         IBRCOMMON_LOGGER_DEBUG(10) << ":LOWPANConnectionSender::run"<< IBRCOMMON_LOGGER_ENDL;
00126 
00127                                         dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00128 
00129                                         // read the bundle out of the storage
00130                                         const dtn::data::Bundle bundle = storage.get(job._bundle);
00131 
00132                                         // Put bundle into stringstream
00133                                         serializer << bundle; _stream.flush();
00134                                         // raise bundle event
00135                                         dtn::net::TransferCompletedEvent::raise(job._destination, bundle);
00136                                         dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED);
00137                                 }
00138                                 // FIXME: Exit strategy when sending on socket failed. Like destroying the connection object
00139                                 // Also check what needs to be done when the node is not reachable (transfer requeue...)
00140 
00141                                 IBRCOMMON_LOGGER_DEBUG(10) << ":LOWPANConnectionSender::run stream destroyed"<< IBRCOMMON_LOGGER_ENDL;
00142                         } catch (std::exception &ex) {
00143                                 IBRCOMMON_LOGGER_DEBUG(10) << "Thread died: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00144                         }
00145                 }
00146 
00147                 void LOWPANConnectionSender::__cancellation()
00148                 {
00149                         _queue.abort();
00150                 }
00151         }
00152 }