IBR-DTNSuite
0.8
|
00001 00010 #include "net/HTTPConvergenceLayer.h" 00011 #include "core/BundleCore.h" 00012 #include <ibrdtn/data/ScopeControlHopLimitBlock.h> 00013 #include <memory> 00014 00015 namespace dtn 00016 { 00017 namespace net 00018 { 00019 00021 const int TIMEOUT = 1000; 00023 const int CONN_TIMEOUT = 5000; 00024 00026 const int HTTP_OK = 200; 00028 const int HTTP_NO_DATA = 410; 00029 00031 const int CURL_CONN_OK = 0; 00033 const int CURL_PARTIAL_FILE = 18; 00034 00035 /* CURL DEBUG SECTION START */ 00036 00040 struct data { 00041 char trace_ascii; 00042 }; 00043 00055 static void dump(const char *text, 00056 FILE *stream, unsigned char *ptr, size_t size, 00057 char nohex) 00058 { 00059 size_t i; 00060 size_t c; 00061 00062 unsigned int width=0x10; 00063 00064 if(nohex) 00065 00066 // without the hex output, we can fit more on screen 00067 00068 width = 0x40; 00069 00070 fprintf(stream, "%s, %10.10ld bytes (0x%8.8lx)\n", 00071 text, (long)size, (long)size); 00072 00073 for(i=0; i<size; i+= width) { 00074 00075 fprintf(stream, "%4.4lx: ", (long)i); 00076 00077 if(!nohex) { 00078 00079 // hex not disabled, show it 00080 00081 for(c = 0; c < width; c++) 00082 if(i+c < size) 00083 fprintf(stream, "%02x ", ptr[i+c]); 00084 else 00085 fputs(" ", stream); 00086 } 00087 00088 for(c = 0; (c < width) && (i+c < size); c++) { 00089 00090 // check for 0D0A; if found, skip past and start a new line of output 00091 00092 if (nohex && (i+c+1 < size) && ptr[i+c]==0x0D && ptr[i+c+1]==0x0A) { 00093 i+=(c+2-width); 00094 break; 00095 } 00096 fprintf(stream, "%c", 00097 (ptr[i+c]>=0x20) && (ptr[i+c]<0x80)?ptr[i+c]:'.'); 00098 00099 // check again for 0D0A, to avoid an extra \n if it's at width 00100 00101 if (nohex && (i+c+2 < size) && ptr[i+c+1]==0x0D && ptr[i+c+2]==0x0A) { 00102 i+=(c+3-width); 00103 break; 00104 } 00105 } 00106 00107 00108 fputc('\n', stream); 00109 } 00110 fflush(stream); 00111 } 00112 00113 00125 static int my_trace(CURL *handle, curl_infotype type, 00126 char *data, size_t size, void *userp) 00127 { 00128 struct data *config = (struct data *)userp; 00129 const char *text; 00130 (void)handle; // prevent compiler warning 00131 00132 switch (type) { 00133 case CURLINFO_TEXT: 00134 fprintf(stderr, "== Info: %s", data); 00135 default: // in case a new one is introduced to shock us 00136 return 0; 00137 00138 case CURLINFO_HEADER_IN: 00139 text = "<= Recv header"; 00140 break; 00141 case CURLINFO_DATA_IN: 00142 text = "<= Recv data"; 00143 break; 00144 } 00145 00146 dump(text, stderr, (unsigned char *)data, size, config->trace_ascii); 00147 return 0; 00148 } 00149 00150 /* CURL DEBUG SECTION END */ 00151 00152 00161 static size_t HTTPConvergenceLayer_callback_read(void *ptr, size_t size, size_t nmemb, void *s) 00162 { 00163 size_t retcode = 0; 00164 std::istream *stream = static_cast<std::istream*>(s); 00165 00166 if (stream->eof()) return 0; 00167 00168 char *buffer = static_cast<char*>(ptr); 00169 00170 stream->read(buffer, (size * nmemb)); 00171 retcode = stream->gcount(); 00172 00173 return retcode; 00174 } 00175 00184 static size_t HTTPConvergenceLayer_callback_write(void *ptr, size_t size, size_t nmemb, void *s) 00185 { 00186 std::ostream *stream = static_cast<std::ostream*>(s); 00187 char *buffer = static_cast<char*>(ptr); 00188 00189 if (!stream->good()) return 0; 00190 00191 stream->write(buffer, (size * nmemb)); 00192 stream->flush(); 00193 00194 return (size * nmemb); 00195 } 00196 00204 HTTPConvergenceLayer::HTTPConvergenceLayer(const std::string &server) 00205 : _server(server), _push_iob(NULL), _running(true) 00206 { 00207 curl_global_init(CURL_GLOBAL_ALL); 00208 } 00209 00210 00215 HTTPConvergenceLayer::~HTTPConvergenceLayer() 00216 { 00217 curl_global_cleanup(); 00218 } 00219 00227 DownloadThread::DownloadThread(ibrcommon::iobuffer &buf) 00228 : _stream(&buf) 00229 { 00230 } 00231 00236 DownloadThread::~DownloadThread() 00237 { 00238 join(); 00239 } 00240 00248 void DownloadThread::run() 00249 { 00250 try { 00251 while(_stream.good()) 00252 { 00253 try { 00254 dtn::data::Bundle bundle; 00255 dtn::data::DefaultDeserializer(_stream) >> bundle; 00256 00257 // validate the bundle 00258 dtn::core::BundleCore::getInstance().validate(bundle); 00259 00260 // increment value in the scope control hop limit block 00261 try { 00262 dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<dtn::data::ScopeControlHopLimitBlock>(); 00263 schl.increment(); 00264 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { }; 00265 00266 // raise default bundle received event 00267 dtn::net::BundleReceivedEvent::raise(dtn::data::EID(), bundle, false, true); 00268 } catch (const ibrcommon::Exception &ex) { 00269 IBRCOMMON_LOGGER_DEBUG(10) << "http error: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00270 } 00271 00272 yield(); 00273 } 00274 } catch (const ibrcommon::ThreadException &e) { 00275 std::cerr << "error: " << e.what() << std::endl; 00276 } 00277 } 00278 00279 void DownloadThread::__cancellation() 00280 { 00281 } 00282 00296 void HTTPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job) 00297 { 00298 dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00299 std::string url_send; 00300 00301 long http_code = 0; 00302 //double upload_size = 0; 00303 00304 try { 00305 // read the bundle out of the storage 00306 const dtn::data::Bundle bundle = storage.get(job._bundle); 00307 00308 ibrcommon::BLOB::Reference ref = ibrcommon::BLOB::create(); 00309 { 00310 ibrcommon::BLOB::iostream io = ref.iostream(); 00311 dtn::data::DefaultSerializer(*io) << bundle; 00312 } 00313 00314 ibrcommon::BLOB::iostream io = ref.iostream(); 00315 size_t length = io.size(); 00316 CURLcode res; 00317 CURL *curl_up; 00318 00319 url_send = _server + "?eid=" + dtn::core::BundleCore::local.getString(); 00320 //+ "&dst-eid=dtn://experthe-laptop/filetransfer"; 00321 //+ "&priority=2"; 00322 //+ "&ttl=3600"; 00323 00324 //if (job._bundle.source.getString() == (dtn::core::BundleCore::local.getString() + "/echo-client")) 00325 //{ 00326 // url_send = _server + "?eid=" + dtn::core::BundleCore::local.getString() + "&dst-eid=echo-client"; 00327 //} 00328 00329 curl_up = curl_easy_init(); 00330 if(curl_up) 00331 { 00332 /* we want to use our own read function */ 00333 curl_easy_setopt(curl_up, CURLOPT_READFUNCTION, HTTPConvergenceLayer_callback_read); 00334 00335 /* enable uploading */ 00336 curl_easy_setopt(curl_up, CURLOPT_UPLOAD, 1L); 00337 00338 /* HTTP PUT please */ 00339 curl_easy_setopt(curl_up, CURLOPT_PUT, 1L); 00340 00341 /* disable connection timeout */ 00342 curl_easy_setopt(curl_up, CURLOPT_CONNECTTIMEOUT, 0); 00343 00344 /* specify target URL, and note that this URL should include a file 00345 name, not only a directory */ 00346 curl_easy_setopt(curl_up, CURLOPT_URL, url_send.c_str()); 00347 00348 /* now specify which file to upload */ 00349 curl_easy_setopt(curl_up, CURLOPT_READDATA, &(*io)); 00350 00351 /* provide the size of the upload, we specicially typecast the value 00352 to curl_off_t since we must be sure to use the correct data size */ 00353 curl_easy_setopt(curl_up, CURLOPT_INFILESIZE_LARGE, 00354 (curl_off_t)length); 00355 00356 /* Now run off and do what you've been told! */ 00357 res = curl_easy_perform(curl_up); 00358 00359 if(res == CURL_CONN_OK) 00360 { 00361 /* get HTTP Header StatusCode */ 00362 curl_easy_getinfo (curl_up, CURLINFO_RESPONSE_CODE, &http_code); 00363 //curl_easy_getinfo (curl, CURLINFO_SIZE_UPLOAD, &upload_size); 00364 00365 /* DEBUG OUTPUT INFORMATION */ 00366 //std::cout << "CURL CODE : " << res << std::endl; 00367 //std::cout << "HTTP CODE : " << http_code << std::endl; 00368 //std::cout << "UPLOAD_SIZE: " << upload_size << " Bytes" << std::endl; 00369 /* DEBUG OUTPUT INFORMATION */ 00370 00371 if(http_code == HTTP_OK) 00372 { 00373 dtn::net::TransferCompletedEvent::raise(job._destination, bundle); 00374 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED); 00375 } 00376 } 00377 00378 curl_easy_cleanup(curl_up); 00379 } 00380 } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { 00381 // send transfer aborted event 00382 dtn::net::TransferAbortedEvent::raise(node.getEID(), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED); 00383 } 00384 00385 } 00386 00390 dtn::core::Node::Protocol HTTPConvergenceLayer::getDiscoveryProtocol() const 00391 { 00392 return dtn::core::Node::CONN_HTTP; 00393 } 00394 00399 void HTTPConvergenceLayer::componentUp() 00400 { 00401 } 00402 00414 void HTTPConvergenceLayer::componentRun() 00415 { 00416 00417 std::string url = _server + "?eid=" + dtn::core::BundleCore::local.getString(); 00418 00419 CURL *curl_down; 00420 CURLcode res; 00421 00422 /* CURL DEBUG */ 00423 //struct data config; 00424 //config.trace_ascii = 1; /* enable ascii tracing */ 00425 00426 00427 //long http_code = 0; 00428 //double download_size = 0; 00429 //long connects = 0; 00430 00431 while (_running) 00432 { 00433 curl_down = curl_easy_init(); 00434 00435 while(curl_down) 00436 { 00437 curl_easy_setopt(curl_down, CURLOPT_URL, url.c_str()); 00438 00439 /* disable connection timeout */ 00440 curl_easy_setopt(curl_down, CURLOPT_CONNECTTIMEOUT, 0); 00441 00442 /* no progress meter please */ 00443 curl_easy_setopt(curl_down, CURLOPT_NOPROGRESS, 1L); 00444 00445 /* cURL DEBUG options */ 00446 //curl_easy_setopt(curl_down, CURLOPT_DEBUGFUNCTION, my_trace); 00447 //curl_easy_setopt(curl_down, CURLOPT_DEBUGDATA, &config); 00448 00449 //curl_easy_setopt(curl_down, CURLOPT_DEBUGFUNCTION, my_trace); 00450 //curl_easy_setopt(curl_down, CURLOPT_VERBOSE, 1); 00451 00452 // create a receiver thread 00453 { 00454 ibrcommon::MutexLock l(_push_iob_mutex); 00455 _push_iob = new ibrcommon::iobuffer(); 00456 } 00457 00458 std::auto_ptr<ibrcommon::iobuffer> auto_kill(_push_iob); 00459 std::ostream os(_push_iob); 00460 DownloadThread receiver(*_push_iob); 00461 00462 /* send all data to this function */ 00463 curl_easy_setopt(curl_down, CURLOPT_WRITEFUNCTION, HTTPConvergenceLayer_callback_write); 00464 00465 /* now specify where to write data */ 00466 curl_easy_setopt(curl_down, CURLOPT_WRITEDATA, &os); 00467 00468 /* do curl */ 00469 receiver.start(); 00470 res = curl_easy_perform(curl_down); 00471 00472 { 00473 ibrcommon::MutexLock l(_push_iob_mutex); 00474 /* finalize iobuffer */ 00475 _push_iob->finalize(); 00476 receiver.join(); 00477 _push_iob = NULL; 00478 } 00479 00480 /* get HTTP Header StatusCode */ 00481 //curl_easy_getinfo (curl_down, CURLINFO_RESPONSE_CODE, &http_code); 00482 //curl_easy_getinfo (curl, CURLINFO_SIZE_DOWNLOAD, &download_size); 00483 //curl_easy_getinfo (curl, CURLINFO_NUM_CONNECTS, &connects); 00484 00485 /* DEBUG OUTPUT INFORMATION */ 00486 //std::cout << "CURL CODE : " << res << std::endl; 00487 //std::cerr << "HTTP CODE : " << http_code << std::endl; 00488 //std::cout << "DOWNLOAD_SIZE: " << download_size << " Bytes" << std::endl; 00489 //std::cout << "NUM_CONNECTS : " << connects << " Connects" << std::endl; 00490 /* DEBUG OUTPUT INFORMATION */ 00491 00492 /* Wait some time an retry to connect */ 00493 sleep(CONN_TIMEOUT); // Wenn Verbindung nicht hergestellt werden konnte warte 5 sec. 00494 IBRCOMMON_LOGGER_DEBUG(10) << "http error: " << "Couldn't connect to server ... wait " << CONN_TIMEOUT/1000 << "s until retry" << IBRCOMMON_LOGGER_ENDL; 00495 00496 } 00497 00498 /* always cleanup */ 00499 curl_easy_cleanup(curl_down); 00500 } 00501 yield(); 00502 } 00503 00509 void HTTPConvergenceLayer::componentDown() 00510 { 00511 } 00512 00517 void HTTPConvergenceLayer::__cancellation() 00518 { 00519 _running = false; 00520 00521 ibrcommon::MutexLock l(_push_iob_mutex); 00522 if (_push_iob != NULL) 00523 { 00524 _push_iob->finalize(); 00525 _push_iob = NULL; 00526 } 00527 } 00528 00533 const std::string HTTPConvergenceLayer::getName() const 00534 { 00535 return "HTTPConvergenceLayer"; 00536 } 00537 } 00538 }