IBR-DTNSuite
0.8
|
00001 /* 00002 * APIClient.cpp 00003 * 00004 * Created on: 01.07.2011 00005 * Author: morgenro 00006 */ 00007 00008 #include "ibrdtn/api/APIClient.h" 00009 #include "ibrdtn/data/Bundle.h" 00010 #include "ibrdtn/api/PlainSerializer.h" 00011 #include "ibrdtn/utils/Utils.h" 00012 #include <ibrcommon/Logger.h> 00013 00014 #include <iostream> 00015 00016 namespace dtn 00017 { 00018 namespace api 00019 { 00020 APIClient::Message::Message(STATUS_CODES c, const std::string &m) 00021 : code(c), msg(m) 00022 { 00023 } 00024 00025 APIClient::Message::~Message() 00026 {} 00027 00028 APIClient::APIClient(ibrcommon::tcpstream &stream) 00029 : _stream(stream), _get_busy(false) 00030 { 00031 } 00032 00033 APIClient::~APIClient() 00034 { 00035 } 00036 00037 void APIClient::connect() 00038 { 00039 // receive connection banner 00040 std::string banner; 00041 getline(_stream, banner); 00042 } 00043 00044 APIClient::Message APIClient::__get_message() 00045 { 00046 std::string buffer; 00047 std::stringstream ss; 00048 int code = 0; 00049 00050 if (_stream.eof()) throw ibrcommon::Exception("connection closed"); 00051 00052 getline(_stream, buffer); 00053 ss.clear(); 00054 ss.str(buffer); 00055 ss >> code; 00056 00057 return Message(APIClient::STATUS_CODES(code), buffer); 00058 } 00059 00060 int APIClient::__get_return() 00061 { 00062 try { 00063 ibrcommon::MutexLock l(_queue_cond); 00064 00065 while (_status_queue.empty()) 00066 { 00067 if (_get_busy) 00068 { 00069 _queue_cond.wait(); 00070 } 00071 else 00072 { 00073 _get_busy = true; 00074 throw true; 00075 } 00076 } 00077 00078 // queue has an element... get it 00079 Message m = _status_queue.front(); 00080 _status_queue.pop(); 00081 00082 return m.code; 00083 } catch (bool) { 00084 while (true) 00085 { 00086 Message m = __get_message(); 00087 00088 ibrcommon::MutexLock l(_queue_cond); 00089 if ((m.code >= 600) && (m.code < 700)) 00090 { 00091 _notify_queue.push(m); 00092 _queue_cond.signal(true); 00093 } 00094 else 00095 { 00096 _get_busy = false; 00097 _queue_cond.signal(true); 00098 return m.code; 00099 } 00100 } 00101 } 00102 } 00103 00104 void APIClient::setEndpoint(const std::string &endpoint) 00105 { 00106 _stream << "set endpoint " << endpoint << std::endl; 00107 if (__get_return() != API_STATUS_ACCEPTED) 00108 { 00109 // error 00110 throw ibrcommon::Exception("endpoint invalid"); 00111 } 00112 } 00113 00114 void APIClient::subscribe(const dtn::data::EID &eid) 00115 { 00116 _stream << "registration add " << eid.getString() << std::endl; 00117 if (__get_return() != API_STATUS_ACCEPTED) 00118 { 00119 // error 00120 throw ibrcommon::Exception("eid invalid"); 00121 } 00122 } 00123 00124 void APIClient::unsubscribe(const dtn::data::EID &eid) 00125 { 00126 _stream << "registration del " << eid.getString() << std::endl; 00127 if (__get_return() != API_STATUS_ACCEPTED) 00128 { 00129 // error 00130 throw ibrcommon::Exception("eid invalid"); 00131 } 00132 } 00133 00134 std::list<dtn::data::EID> APIClient::getSubscriptions() 00135 { 00136 std::list<dtn::data::EID> sublst; 00137 _stream << "registration list" << std::endl; 00138 00139 if (__get_return() == API_STATUS_OK) 00140 { 00141 std::string buffer; 00142 00143 while (!_stream.eof()) 00144 { 00145 getline(_stream, buffer); 00146 00147 // end of list? 00148 if (buffer.size() == 0) break; 00149 00150 // add eid to the list 00151 sublst.push_back(buffer); 00152 } 00153 } 00154 else 00155 { 00156 // error 00157 throw ibrcommon::Exception("get failed"); 00158 } 00159 00160 return sublst; 00161 } 00162 00163 dtn::api::Bundle APIClient::get(dtn::data::BundleID &id) 00164 { 00165 _stream << "bundle load " << id.timestamp << " " << id.sequencenumber << " "; 00166 00167 if (id.fragment) 00168 { 00169 _stream << id.offset << " "; 00170 } 00171 00172 _stream << id.source.getString() << std::endl; 00173 00174 switch (__get_return()) 00175 { 00176 case API_STATUS_NOT_FOUND: 00177 throw ibrcommon::Exception("bundle not found"); 00178 00179 case API_STATUS_OK: 00180 break; 00181 00182 default: 00183 throw ibrcommon::Exception("load error"); 00184 } 00185 00186 return register_get(); 00187 } 00188 00189 dtn::api::Bundle APIClient::get() 00190 { 00191 _stream << "bundle load queue" << std::endl; 00192 00193 switch (__get_return()) 00194 { 00195 case API_STATUS_NOT_FOUND: 00196 throw ibrcommon::Exception("bundle not found"); 00197 00198 case API_STATUS_OK: 00199 break; 00200 00201 default: 00202 throw ibrcommon::Exception("load error"); 00203 } 00204 00205 return register_get(); 00206 } 00207 00208 void APIClient::send(const dtn::api::Bundle &bundle) 00209 { 00210 register_put(bundle._b); 00211 register_send(); 00212 } 00213 00214 void APIClient::notify_delivered(const dtn::api::Bundle &b) 00215 { 00216 dtn::data::BundleID id(b._b); 00217 notify_delivered(id); 00218 } 00219 00220 void APIClient::notify_delivered(const dtn::data::BundleID &id) 00221 { 00222 _stream << "bundle delivered " << id.timestamp << " " << id.sequencenumber << " "; 00223 00224 if (id.fragment) 00225 { 00226 _stream << id.offset << " "; 00227 } 00228 00229 _stream << id.source.getString() << std::endl; 00230 00231 switch (__get_return()) 00232 { 00233 case API_STATUS_NOT_FOUND: 00234 throw ibrcommon::Exception("bundle not found"); 00235 00236 case API_STATUS_OK: 00237 break; 00238 00239 default: 00240 throw ibrcommon::Exception("notify delivered error"); 00241 } 00242 } 00243 00244 APIClient::Message APIClient::wait() 00245 { 00246 try { 00247 ibrcommon::MutexLock l(_queue_cond); 00248 00249 while (_notify_queue.empty()) 00250 { 00251 if (_get_busy) 00252 { 00253 _queue_cond.wait(); 00254 } 00255 else 00256 { 00257 _get_busy = true; 00258 throw true; 00259 } 00260 } 00261 00262 // queue has an element... get it 00263 Message m = _notify_queue.front(); 00264 _notify_queue.pop(); 00265 00266 return m; 00267 } catch (bool) { 00268 while (true) 00269 { 00270 Message m = __get_message(); 00271 00272 ibrcommon::MutexLock l(_queue_cond); 00273 if ((m.code < 600) || (m.code >= 700)) 00274 { 00275 _status_queue.push(m); 00276 _queue_cond.signal(true); 00277 } 00278 else 00279 { 00280 _get_busy = false; 00281 _queue_cond.signal(true); 00282 return m; 00283 } 00284 } 00285 } 00286 } 00287 00288 void APIClient::unblock_wait() 00289 { 00290 ibrcommon::MutexLock l(_queue_cond); 00291 _queue_cond.abort(); 00292 } 00293 00294 void APIClient::register_put(const dtn::data::Bundle &bundle) 00295 { 00296 _stream << "bundle put plain" << std::endl; 00297 00298 switch (__get_return()) 00299 { 00300 case API_STATUS_NOT_ACCEPTABLE: 00301 throw ibrcommon::Exception("not acceptable"); 00302 00303 case API_STATUS_CONTINUE: 00304 break; 00305 00306 default: 00307 throw ibrcommon::Exception("put error"); 00308 } 00309 00310 dtn::api::PlainSerializer(_stream) << bundle; _stream << std::flush; 00311 00312 switch (__get_return()) 00313 { 00314 case API_STATUS_NOT_ACCEPTABLE: 00315 throw ibrcommon::Exception("not acceptable"); 00316 00317 case API_STATUS_OK: 00318 break; 00319 00320 default: 00321 throw ibrcommon::Exception("put error"); 00322 } 00323 } 00324 00325 void APIClient::register_clear() 00326 { 00327 _stream << "bundle clear" << std::endl; 00328 00329 switch (__get_return()) 00330 { 00331 case API_STATUS_OK: 00332 break; 00333 00334 default: 00335 throw ibrcommon::Exception("clear error"); 00336 } 00337 } 00338 00339 void APIClient::register_free() 00340 { 00341 _stream << "bundle free" << std::endl; 00342 00343 switch (__get_return()) 00344 { 00345 case API_STATUS_OK: 00346 break; 00347 00348 default: 00349 throw ibrcommon::Exception("free error"); 00350 } 00351 } 00352 00353 void APIClient::register_store() 00354 { 00355 _stream << "bundle store" << std::endl; 00356 00357 switch (__get_return()) 00358 { 00359 case API_STATUS_OK: 00360 break; 00361 00362 default: 00363 throw ibrcommon::Exception("store error"); 00364 } 00365 } 00366 00367 void APIClient::register_send() 00368 { 00369 _stream << "bundle send" << std::endl; 00370 00371 switch (__get_return()) 00372 { 00373 case API_STATUS_OK: 00374 break; 00375 00376 default: 00377 throw ibrcommon::Exception("send error"); 00378 } 00379 } 00380 00381 dtn::data::Bundle APIClient::register_get() 00382 { 00383 dtn::data::Bundle ret; 00384 _stream << "bundle get plain" << std::endl; 00385 00386 switch (__get_return()) 00387 { 00388 case API_STATUS_OK: 00389 dtn::api::PlainDeserializer(_stream) >> ret; 00390 break; 00391 00392 default: 00393 throw ibrcommon::Exception("get error"); 00394 } 00395 00396 return ret; 00397 } 00398 00399 dtn::data::BundleID APIClient::readBundleID(const std::string &data) 00400 { 00401 std::vector<std::string> vdata = dtn::utils::Utils::tokenize(" ", data); 00402 00403 // load bundle id 00404 std::stringstream ss; 00405 size_t timestamp = 0; 00406 size_t sequencenumber = 0; 00407 bool fragment = false; 00408 size_t offset = 0; 00409 00410 // read timestamp 00411 ss.clear(); ss.str(vdata[0]); ss >> timestamp; 00412 00413 // read sequence number 00414 ss.clear(); ss.str(vdata[1]); ss >> sequencenumber; 00415 00416 // read fragment offset 00417 if (vdata.size() > 3) 00418 { 00419 fragment = true; 00420 00421 // read sequence number 00422 ss.clear(); ss.str(vdata[2]); ss >> offset; 00423 } 00424 00425 // read EID 00426 ss.clear(); dtn::data::EID eid(vdata[vdata.size() - 1]); 00427 00428 // construct bundle id 00429 return dtn::data::BundleID(eid, timestamp, sequencenumber, fragment, offset); 00430 } 00431 } 00432 }