IBR-DTNSuite  0.8
daemon/src/net/HTTPConvergenceLayer.cpp
Go to the documentation of this file.
00001 
00010 #include "net/HTTPConvergenceLayer.h"
00011 #include "core/BundleCore.h"
00012 #include <ibrdtn/data/ScopeControlHopLimitBlock.h>
00013 #include <memory>
00014 
00015 namespace dtn
00016 {
00017         namespace net
00018         {
00019 
00021         const int TIMEOUT = 1000;
00023         const int CONN_TIMEOUT = 5000;
00024 
00026         const int HTTP_OK = 200;
00028         const int HTTP_NO_DATA = 410;
00029 
00031         const int CURL_CONN_OK = 0;
00033         const int CURL_PARTIAL_FILE = 18;
00034 
00035         /* CURL DEBUG SECTION START */
00036 
00040         struct data {
00041                 char trace_ascii;
00042         };
00043 
00055         static void dump(const char *text,
00056                                                 FILE *stream, unsigned char *ptr, size_t size,
00057                                                 char nohex)
00058         {
00059                   size_t i;
00060                   size_t c;
00061 
00062                   unsigned int width=0x10;
00063 
00064                   if(nohex)
00065 
00066                         // without the hex output, we can fit more on screen
00067 
00068                         width = 0x40;
00069 
00070                   fprintf(stream, "%s, %10.10ld bytes (0x%8.8lx)\n",
00071                                   text, (long)size, (long)size);
00072 
00073                   for(i=0; i<size; i+= width) {
00074 
00075                         fprintf(stream, "%4.4lx: ", (long)i);
00076 
00077                         if(!nohex) {
00078 
00079                           // hex not disabled, show it
00080 
00081               for(c = 0; c < width; c++)
00082                                 if(i+c < size)
00083                                   fprintf(stream, "%02x ", ptr[i+c]);
00084                                 else
00085                                   fputs("   ", stream);
00086                         }
00087 
00088                         for(c = 0; (c < width) && (i+c < size); c++) {
00089 
00090                           // check for 0D0A; if found, skip past and start a new line of output
00091 
00092                    if (nohex && (i+c+1 < size) && ptr[i+c]==0x0D && ptr[i+c+1]==0x0A) {
00093                                 i+=(c+2-width);
00094                                 break;
00095                           }
00096                           fprintf(stream, "%c",
00097                                           (ptr[i+c]>=0x20) && (ptr[i+c]<0x80)?ptr[i+c]:'.');
00098 
00099                           // check again for 0D0A, to avoid an extra \n if it's at width
00100 
00101                           if (nohex && (i+c+2 < size) && ptr[i+c+1]==0x0D && ptr[i+c+2]==0x0A) {
00102                                 i+=(c+3-width);
00103                                 break;
00104                           }
00105                         }
00106 
00107 
00108                         fputc('\n', stream);
00109                   }
00110                   fflush(stream);
00111         }
00112 
00113 
00125         static int my_trace(CURL *handle, curl_infotype type,
00126                      char *data, size_t size, void *userp)
00127         {
00128                   struct data *config = (struct data *)userp;
00129                   const char *text;
00130                   (void)handle; // prevent compiler warning
00131 
00132                   switch (type) {
00133                           case CURLINFO_TEXT:
00134                                 fprintf(stderr, "== Info: %s", data);
00135                           default: // in case a new one is introduced to shock us
00136                                 return 0;
00137 
00138                           case CURLINFO_HEADER_IN:
00139                                 text = "<= Recv header";
00140                                 break;
00141                           case CURLINFO_DATA_IN:
00142                                 text = "<= Recv data";
00143                                 break;
00144                   }
00145 
00146                   dump(text, stderr, (unsigned char *)data, size, config->trace_ascii);
00147                   return 0;
00148          }
00149 
00150         /* CURL DEBUG SECTION END */
00151 
00152 
00161                 static size_t HTTPConvergenceLayer_callback_read(void *ptr, size_t size, size_t nmemb, void *s)
00162                 {
00163                         size_t retcode = 0;
00164                         std::istream *stream = static_cast<std::istream*>(s);
00165 
00166                         if (stream->eof()) return 0;
00167 
00168                         char *buffer = static_cast<char*>(ptr);
00169 
00170                         stream->read(buffer, (size * nmemb));
00171                         retcode = stream->gcount();
00172 
00173                         return retcode;
00174                 }
00175 
00184                 static size_t HTTPConvergenceLayer_callback_write(void *ptr, size_t size, size_t nmemb, void *s)
00185                 {
00186                         std::ostream *stream = static_cast<std::ostream*>(s);
00187                         char *buffer = static_cast<char*>(ptr);
00188 
00189                         if (!stream->good()) return 0;
00190 
00191                         stream->write(buffer, (size * nmemb));
00192                         stream->flush();
00193 
00194                         return (size * nmemb);
00195                 }
00196 
00204                 HTTPConvergenceLayer::HTTPConvergenceLayer(const std::string &server)
00205                  : _server(server), _push_iob(NULL), _running(true)
00206                 {
00207                         curl_global_init(CURL_GLOBAL_ALL);
00208                 }
00209 
00210 
00215                 HTTPConvergenceLayer::~HTTPConvergenceLayer()
00216                 {
00217                         curl_global_cleanup();
00218                 }
00219 
00227                 DownloadThread::DownloadThread(ibrcommon::iobuffer &buf)
00228                  : _stream(&buf)
00229                 {
00230                 }
00231 
00236                 DownloadThread::~DownloadThread()
00237                 {
00238                         join();
00239                 }
00240 
00248                 void DownloadThread::run()
00249                 {
00250                         try  {
00251                                 while(_stream.good())
00252                                 {
00253                                         try  {
00254                                                 dtn::data::Bundle bundle;
00255                                                 dtn::data::DefaultDeserializer(_stream) >> bundle;
00256                                                 
00257                                                 // validate the bundle
00258                                                 dtn::core::BundleCore::getInstance().validate(bundle);
00259 
00260                                                 // increment value in the scope control hop limit block
00261                                                 try {
00262                                                         dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<dtn::data::ScopeControlHopLimitBlock>();
00263                                                         schl.increment();
00264                                                 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { };
00265 
00266                                                 // raise default bundle received event
00267                                                 dtn::net::BundleReceivedEvent::raise(dtn::data::EID(), bundle, false, true);
00268                                         } catch (const ibrcommon::Exception &ex) {
00269                                                 IBRCOMMON_LOGGER_DEBUG(10) << "http error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00270                                         }
00271 
00272                                         yield();
00273                                 }
00274                         } catch (const ibrcommon::ThreadException &e)  {
00275                                 std::cerr << "error: " << e.what() << std::endl;
00276                         }
00277                 }
00278 
00279                 void DownloadThread::__cancellation()
00280                 {
00281                 }
00282 
00296                 void HTTPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job)
00297                 {
00298                         dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00299                         std::string url_send;
00300 
00301                         long http_code = 0;
00302                         //double upload_size = 0;
00303 
00304                         try {
00305                                 // read the bundle out of the storage
00306                                 const dtn::data::Bundle bundle = storage.get(job._bundle);
00307 
00308                                 ibrcommon::BLOB::Reference ref = ibrcommon::BLOB::create();
00309                                 {
00310                                         ibrcommon::BLOB::iostream io = ref.iostream();
00311                                         dtn::data::DefaultSerializer(*io) << bundle;
00312                                 }
00313 
00314                                 ibrcommon::BLOB::iostream io = ref.iostream();
00315                                 size_t length = io.size();
00316                                 CURLcode res;
00317                                 CURL *curl_up;
00318 
00319                                 url_send = _server + "?eid=" + dtn::core::BundleCore::local.getString();
00320                                                                                  //+ "&dst-eid=dtn://experthe-laptop/filetransfer";
00321                                                                                  //+ "&priority=2";
00322                                                                                  //+ "&ttl=3600";
00323 
00324                                 //if (job._bundle.source.getString() == (dtn::core::BundleCore::local.getString() + "/echo-client"))
00325                                 //{
00326                                 //      url_send = _server + "?eid=" + dtn::core::BundleCore::local.getString() + "&dst-eid=echo-client";
00327                                 //}
00328 
00329                                 curl_up = curl_easy_init();
00330                                 if(curl_up)
00331                                 {
00332                                         /* we want to use our own read function */
00333                                         curl_easy_setopt(curl_up, CURLOPT_READFUNCTION, HTTPConvergenceLayer_callback_read);
00334 
00335                                         /* enable uploading */
00336                                         curl_easy_setopt(curl_up, CURLOPT_UPLOAD, 1L);
00337 
00338                                         /* HTTP PUT please */
00339                                         curl_easy_setopt(curl_up, CURLOPT_PUT, 1L);
00340 
00341                                         /* disable connection timeout */
00342                                         curl_easy_setopt(curl_up, CURLOPT_CONNECTTIMEOUT, 0);
00343 
00344                                         /* specify target URL, and note that this URL should include a file
00345                                            name, not only a directory */
00346                                         curl_easy_setopt(curl_up, CURLOPT_URL, url_send.c_str());
00347 
00348                                         /* now specify which file to upload */
00349                                         curl_easy_setopt(curl_up, CURLOPT_READDATA, &(*io));
00350 
00351                                         /* provide the size of the upload, we specicially typecast the value
00352                                            to curl_off_t since we must be sure to use the correct data size */
00353                                         curl_easy_setopt(curl_up, CURLOPT_INFILESIZE_LARGE,
00354                                                                          (curl_off_t)length);
00355 
00356                                         /* Now run off and do what you've been told! */
00357                                         res = curl_easy_perform(curl_up);
00358 
00359                                         if(res == CURL_CONN_OK)
00360                                         {
00361                                                 /* get HTTP Header StatusCode */
00362                                                 curl_easy_getinfo (curl_up, CURLINFO_RESPONSE_CODE, &http_code);
00363                                                 //curl_easy_getinfo (curl, CURLINFO_SIZE_UPLOAD, &upload_size);
00364 
00365                                                 /* DEBUG OUTPUT INFORMATION */
00366                                                 //std::cout << "CURL CODE    : " << res << std::endl;
00367                                                 //std::cout << "HTTP CODE    : " << http_code << std::endl;
00368                                                 //std::cout << "UPLOAD_SIZE: " << upload_size << " Bytes" << std::endl;
00369                                                 /* DEBUG OUTPUT INFORMATION */
00370 
00371                                                 if(http_code == HTTP_OK)
00372                                                 {
00373                                                         dtn::net::TransferCompletedEvent::raise(job._destination, bundle);
00374                                                         dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED);
00375                                                 }
00376                                         }
00377 
00378                                         curl_easy_cleanup(curl_up);
00379                                 }
00380                         } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) {
00381                                 // send transfer aborted event
00382                                 dtn::net::TransferAbortedEvent::raise(node.getEID(), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00383                         }
00384 
00385                 }
00386 
00390                 dtn::core::Node::Protocol HTTPConvergenceLayer::getDiscoveryProtocol() const
00391                 {
00392                         return dtn::core::Node::CONN_HTTP;
00393                 }
00394 
00399                 void HTTPConvergenceLayer::componentUp()
00400                 {
00401                 }
00402 
00414                 void HTTPConvergenceLayer::componentRun()
00415                 {
00416 
00417                         std::string url = _server + "?eid=" + dtn::core::BundleCore::local.getString();
00418 
00419                         CURL *curl_down;
00420                         CURLcode res;
00421 
00422                         /* CURL DEBUG */
00423             //struct data config;
00424                         //config.trace_ascii = 1; /* enable ascii tracing */
00425 
00426 
00427                         //long http_code = 0;
00428                         //double download_size = 0;
00429                         //long connects = 0;
00430 
00431                         while (_running)
00432                         {
00433                                 curl_down = curl_easy_init();
00434 
00435                                 while(curl_down)
00436                                 {
00437                                         curl_easy_setopt(curl_down, CURLOPT_URL, url.c_str());
00438 
00439                                         /* disable connection timeout */
00440                                         curl_easy_setopt(curl_down, CURLOPT_CONNECTTIMEOUT, 0);
00441 
00442                                         /* no progress meter please */
00443                                         curl_easy_setopt(curl_down, CURLOPT_NOPROGRESS, 1L);
00444 
00445                                         /* cURL DEBUG options */
00446                                         //curl_easy_setopt(curl_down, CURLOPT_DEBUGFUNCTION, my_trace);
00447                                     //curl_easy_setopt(curl_down, CURLOPT_DEBUGDATA, &config);
00448 
00449                                     //curl_easy_setopt(curl_down, CURLOPT_DEBUGFUNCTION, my_trace);
00450                                         //curl_easy_setopt(curl_down, CURLOPT_VERBOSE, 1);
00451 
00452                                         // create a receiver thread
00453                                         {
00454                                                 ibrcommon::MutexLock l(_push_iob_mutex);
00455                                                 _push_iob = new ibrcommon::iobuffer();
00456                                         }
00457 
00458                                         std::auto_ptr<ibrcommon::iobuffer> auto_kill(_push_iob);
00459                                         std::ostream os(_push_iob);
00460                                         DownloadThread receiver(*_push_iob);
00461 
00462                                         /* send all data to this function  */
00463                                         curl_easy_setopt(curl_down, CURLOPT_WRITEFUNCTION, HTTPConvergenceLayer_callback_write);
00464 
00465                                         /* now specify where to write data */
00466                                         curl_easy_setopt(curl_down, CURLOPT_WRITEDATA, &os);
00467 
00468                                         /* do curl */
00469                                         receiver.start();
00470                                         res = curl_easy_perform(curl_down);
00471 
00472                                         {
00473                                                 ibrcommon::MutexLock l(_push_iob_mutex);
00474                                                 /* finalize iobuffer */
00475                                                 _push_iob->finalize();
00476                                                 receiver.join();
00477                                                 _push_iob = NULL;
00478                                         }
00479 
00480                                         /* get HTTP Header StatusCode */
00481                                         //curl_easy_getinfo (curl_down, CURLINFO_RESPONSE_CODE, &http_code);
00482                                         //curl_easy_getinfo (curl, CURLINFO_SIZE_DOWNLOAD, &download_size);
00483                                         //curl_easy_getinfo (curl, CURLINFO_NUM_CONNECTS, &connects);
00484 
00485                                         /* DEBUG OUTPUT INFORMATION */
00486                                         //std::cout << "CURL CODE    : " << res << std::endl;
00487                                         //std::cerr << "HTTP CODE    : " << http_code << std::endl;
00488                                         //std::cout << "DOWNLOAD_SIZE: " << download_size << " Bytes" << std::endl;
00489                                         //std::cout << "NUM_CONNECTS : " << connects << " Connects" << std::endl;
00490                                         /* DEBUG OUTPUT INFORMATION */
00491 
00492                                         /* Wait some time an retry to connect */
00493                                         sleep(CONN_TIMEOUT);  // Wenn Verbindung nicht hergestellt werden konnte warte 5 sec.
00494                                         IBRCOMMON_LOGGER_DEBUG(10) << "http error: " << "Couldn't connect to server ... wait " << CONN_TIMEOUT/1000 << "s until retry" << IBRCOMMON_LOGGER_ENDL;
00495 
00496                                 }
00497 
00498                                 /* always cleanup */
00499                                 curl_easy_cleanup(curl_down);
00500                         }
00501                         yield();
00502                 }
00503 
00509                 void HTTPConvergenceLayer::componentDown()
00510                 {
00511                 }
00512 
00517                 void HTTPConvergenceLayer::__cancellation()
00518                 {
00519                         _running = false;
00520 
00521                         ibrcommon::MutexLock l(_push_iob_mutex);
00522                         if (_push_iob != NULL)
00523                         {
00524                                 _push_iob->finalize();
00525                                 _push_iob = NULL;
00526                         }
00527                 }
00528 
00533                 const std::string HTTPConvergenceLayer::getName() const
00534                 {
00535                         return "HTTPConvergenceLayer";
00536                 }
00537         }
00538 }