IBR-DTNSuite  0.8
ibrdtn/ibrdtn/api/APIClient.cpp
Go to the documentation of this file.
00001 /*
00002  * APIClient.cpp
00003  *
00004  *  Created on: 01.07.2011
00005  *      Author: morgenro
00006  */
00007 
00008 #include "ibrdtn/api/APIClient.h"
00009 #include "ibrdtn/data/Bundle.h"
00010 #include "ibrdtn/api/PlainSerializer.h"
00011 #include "ibrdtn/utils/Utils.h"
00012 #include <ibrcommon/Logger.h>
00013 
00014 #include <iostream>
00015 
00016 namespace dtn
00017 {
00018         namespace api
00019         {
00020                 APIClient::Message::Message(STATUS_CODES c, const std::string &m)
00021                  : code(c), msg(m)
00022                 {
00023                 }
00024 
00025                 APIClient::Message::~Message()
00026                 {}
00027 
00028                 APIClient::APIClient(ibrcommon::tcpstream &stream)
00029                   : _stream(stream), _get_busy(false)
00030                 {
00031                 }
00032 
00033                 APIClient::~APIClient()
00034                 {
00035                 }
00036 
00037                 void APIClient::connect()
00038                 {
00039                         // receive connection banner
00040                         std::string banner;
00041                         getline(_stream, banner);
00042                 }
00043 
00044                 APIClient::Message APIClient::__get_message()
00045                 {
00046                         std::string buffer;
00047                         std::stringstream ss;
00048                         int code = 0;
00049 
00050                         if (_stream.eof()) throw ibrcommon::Exception("connection closed");
00051 
00052                         getline(_stream, buffer);
00053                         ss.clear();
00054                         ss.str(buffer);
00055                         ss >> code;
00056 
00057                         return Message(APIClient::STATUS_CODES(code), buffer);
00058                 }
00059 
00060                 int APIClient::__get_return()
00061                 {
00062                         try {
00063                                 ibrcommon::MutexLock l(_queue_cond);
00064 
00065                                 while (_status_queue.empty())
00066                                 {
00067                                         if (_get_busy)
00068                                         {
00069                                                 _queue_cond.wait();
00070                                         }
00071                                         else
00072                                         {
00073                                                 _get_busy = true;
00074                                                 throw true;
00075                                         }
00076                                 }
00077 
00078                                 // queue has an element... get it
00079                                 Message m = _status_queue.front();
00080                                 _status_queue.pop();
00081 
00082                                 return m.code;
00083                         } catch (bool) {
00084                                 while (true)
00085                                 {
00086                                         Message m = __get_message();
00087 
00088                                         ibrcommon::MutexLock l(_queue_cond);
00089                                         if ((m.code >= 600) && (m.code < 700))
00090                                         {
00091                                                 _notify_queue.push(m);
00092                                                 _queue_cond.signal(true);
00093                                         }
00094                                         else
00095                                         {
00096                                                 _get_busy = false;
00097                                                 _queue_cond.signal(true);
00098                                                 return m.code;
00099                                         }
00100                                 }
00101                         }
00102                 }
00103 
00104                 void APIClient::setEndpoint(const std::string &endpoint)
00105                 {
00106                         _stream << "set endpoint " << endpoint << std::endl;
00107                         if (__get_return() != API_STATUS_ACCEPTED)
00108                         {
00109                                 // error
00110                                 throw ibrcommon::Exception("endpoint invalid");
00111                         }
00112                 }
00113 
00114                 void APIClient::subscribe(const dtn::data::EID &eid)
00115                 {
00116                         _stream << "registration add " << eid.getString() << std::endl;
00117                         if (__get_return() != API_STATUS_ACCEPTED)
00118                         {
00119                                 // error
00120                                 throw ibrcommon::Exception("eid invalid");
00121                         }
00122                 }
00123 
00124                 void APIClient::unsubscribe(const dtn::data::EID &eid)
00125                 {
00126                         _stream << "registration del " << eid.getString() << std::endl;
00127                         if (__get_return() != API_STATUS_ACCEPTED)
00128                         {
00129                                 // error
00130                                 throw ibrcommon::Exception("eid invalid");
00131                         }
00132                 }
00133 
00134                 std::list<dtn::data::EID> APIClient::getSubscriptions()
00135                 {
00136                         std::list<dtn::data::EID> sublst;
00137                         _stream << "registration list" << std::endl;
00138 
00139                         if (__get_return() == API_STATUS_OK)
00140                         {
00141                                 std::string buffer;
00142 
00143                                 while (!_stream.eof())
00144                                 {
00145                                         getline(_stream, buffer);
00146 
00147                                         // end of list?
00148                                         if (buffer.size() == 0) break;
00149 
00150                                         // add eid to the list
00151                                         sublst.push_back(buffer);
00152                                 }
00153                         }
00154                         else
00155                         {
00156                                 // error
00157                                 throw ibrcommon::Exception("get failed");
00158                         }
00159 
00160                         return sublst;
00161                 }
00162 
00163                 dtn::api::Bundle APIClient::get(dtn::data::BundleID &id)
00164                 {
00165                         _stream << "bundle load " << id.timestamp << " " << id.sequencenumber << " ";
00166 
00167                         if (id.fragment)
00168                         {
00169                                 _stream << id.offset << " ";
00170                         }
00171 
00172                         _stream << id.source.getString() << std::endl;
00173 
00174                         switch (__get_return())
00175                         {
00176                         case API_STATUS_NOT_FOUND:
00177                                 throw ibrcommon::Exception("bundle not found");
00178 
00179                         case API_STATUS_OK:
00180                                 break;
00181 
00182                         default:
00183                                 throw ibrcommon::Exception("load error");
00184                         }
00185 
00186                         return register_get();
00187                 }
00188 
00189                 dtn::api::Bundle APIClient::get()
00190                 {
00191                         _stream << "bundle load queue" << std::endl;
00192 
00193                         switch (__get_return())
00194                         {
00195                         case API_STATUS_NOT_FOUND:
00196                                 throw ibrcommon::Exception("bundle not found");
00197 
00198                         case API_STATUS_OK:
00199                                 break;
00200 
00201                         default:
00202                                 throw ibrcommon::Exception("load error");
00203                         }
00204 
00205                         return register_get();
00206                 }
00207 
00208                 void APIClient::send(const dtn::api::Bundle &bundle)
00209                 {
00210                         register_put(bundle._b);
00211                         register_send();
00212                 }
00213 
00214                 void APIClient::notify_delivered(const dtn::api::Bundle &b)
00215                 {
00216                         dtn::data::BundleID id(b._b);
00217                         notify_delivered(id);
00218                 }
00219 
00220                 void APIClient::notify_delivered(const dtn::data::BundleID &id)
00221                 {
00222                         _stream << "bundle delivered " << id.timestamp << " " << id.sequencenumber << " ";
00223 
00224                         if (id.fragment)
00225                         {
00226                                 _stream << id.offset << " ";
00227                         }
00228 
00229                         _stream << id.source.getString() << std::endl;
00230 
00231                         switch (__get_return())
00232                         {
00233                         case API_STATUS_NOT_FOUND:
00234                                 throw ibrcommon::Exception("bundle not found");
00235 
00236                         case API_STATUS_OK:
00237                                 break;
00238 
00239                         default:
00240                                 throw ibrcommon::Exception("notify delivered error");
00241                         }
00242                 }
00243 
00244                 APIClient::Message APIClient::wait()
00245                 {
00246                         try {
00247                                 ibrcommon::MutexLock l(_queue_cond);
00248 
00249                                 while (_notify_queue.empty())
00250                                 {
00251                                         if (_get_busy)
00252                                         {
00253                                                 _queue_cond.wait();
00254                                         }
00255                                         else
00256                                         {
00257                                                 _get_busy = true;
00258                                                 throw true;
00259                                         }
00260                                 }
00261 
00262                                 // queue has an element... get it
00263                                 Message m = _notify_queue.front();
00264                                 _notify_queue.pop();
00265 
00266                                 return m;
00267                         } catch (bool) {
00268                                 while (true)
00269                                 {
00270                                         Message m = __get_message();
00271 
00272                                         ibrcommon::MutexLock l(_queue_cond);
00273                                         if ((m.code < 600) || (m.code >= 700))
00274                                         {
00275                                                 _status_queue.push(m);
00276                                                 _queue_cond.signal(true);
00277                                         }
00278                                         else
00279                                         {
00280                                                 _get_busy = false;
00281                                                 _queue_cond.signal(true);
00282                                                 return m;
00283                                         }
00284                                 }
00285                         }
00286                 }
00287 
00288                 void APIClient::unblock_wait()
00289                 {
00290                         ibrcommon::MutexLock l(_queue_cond);
00291                         _queue_cond.abort();
00292                 }
00293 
00294                 void APIClient::register_put(const dtn::data::Bundle &bundle)
00295                 {
00296                         _stream << "bundle put plain" << std::endl;
00297 
00298                         switch (__get_return())
00299                         {
00300                         case API_STATUS_NOT_ACCEPTABLE:
00301                                 throw ibrcommon::Exception("not acceptable");
00302 
00303                         case API_STATUS_CONTINUE:
00304                                 break;
00305 
00306                         default:
00307                                 throw ibrcommon::Exception("put error");
00308                         }
00309 
00310                         dtn::api::PlainSerializer(_stream) << bundle; _stream << std::flush;
00311 
00312                         switch (__get_return())
00313                         {
00314                         case API_STATUS_NOT_ACCEPTABLE:
00315                                 throw ibrcommon::Exception("not acceptable");
00316 
00317                         case API_STATUS_OK:
00318                                 break;
00319 
00320                         default:
00321                                 throw ibrcommon::Exception("put error");
00322                         }
00323                 }
00324 
00325                 void APIClient::register_clear()
00326                 {
00327                         _stream << "bundle clear" << std::endl;
00328 
00329                         switch (__get_return())
00330                         {
00331                         case API_STATUS_OK:
00332                                 break;
00333 
00334                         default:
00335                                 throw ibrcommon::Exception("clear error");
00336                         }
00337                 }
00338 
00339                 void APIClient::register_free()
00340                 {
00341                         _stream << "bundle free" << std::endl;
00342 
00343                         switch (__get_return())
00344                         {
00345                         case API_STATUS_OK:
00346                                 break;
00347 
00348                         default:
00349                                 throw ibrcommon::Exception("free error");
00350                         }
00351                 }
00352 
00353                 void APIClient::register_store()
00354                 {
00355                         _stream << "bundle store" << std::endl;
00356 
00357                         switch (__get_return())
00358                         {
00359                         case API_STATUS_OK:
00360                                 break;
00361 
00362                         default:
00363                                 throw ibrcommon::Exception("store error");
00364                         }
00365                 }
00366 
00367                 void APIClient::register_send()
00368                 {
00369                         _stream << "bundle send" << std::endl;
00370 
00371                         switch (__get_return())
00372                         {
00373                         case API_STATUS_OK:
00374                                 break;
00375 
00376                         default:
00377                                 throw ibrcommon::Exception("send error");
00378                         }
00379                 }
00380 
00381                 dtn::data::Bundle APIClient::register_get()
00382                 {
00383                         dtn::data::Bundle ret;
00384                         _stream << "bundle get plain" << std::endl;
00385 
00386                         switch (__get_return())
00387                         {
00388                         case API_STATUS_OK:
00389                                 dtn::api::PlainDeserializer(_stream) >> ret;
00390                                 break;
00391 
00392                         default:
00393                                 throw ibrcommon::Exception("get error");
00394                         }
00395 
00396                         return ret;
00397                 }
00398 
00399                 dtn::data::BundleID APIClient::readBundleID(const std::string &data)
00400                 {
00401                         std::vector<std::string> vdata = dtn::utils::Utils::tokenize(" ", data);
00402 
00403                         // load bundle id
00404                         std::stringstream ss;
00405                         size_t timestamp = 0;
00406                         size_t sequencenumber = 0;
00407                         bool fragment = false;
00408                         size_t offset = 0;
00409 
00410                         // read timestamp
00411                          ss.clear(); ss.str(vdata[0]); ss >> timestamp;
00412 
00413                         // read sequence number
00414                          ss.clear(); ss.str(vdata[1]); ss >> sequencenumber;
00415 
00416                         // read fragment offset
00417                         if (vdata.size() > 3)
00418                         {
00419                                 fragment = true;
00420 
00421                                 // read sequence number
00422                                  ss.clear(); ss.str(vdata[2]); ss >> offset;
00423                         }
00424 
00425                         // read EID
00426                          ss.clear(); dtn::data::EID eid(vdata[vdata.size() - 1]);
00427 
00428                         // construct bundle id
00429                         return dtn::data::BundleID(eid, timestamp, sequencenumber, fragment, offset);
00430                 }
00431         }
00432 }