IBR-DTNSuite  0.10
OrderedStreamHandler.cpp
Go to the documentation of this file.
1 /*
2  * OrderedStreamHandler.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
23 #include "core/BundleCore.h"
25 
27 #include <ibrdtn/utils/Utils.h>
28 #include <ibrcommon/Logger.h>
29 
30 #ifdef WITH_COMPRESSION
32 #endif
33 
34 #ifdef WITH_BUNDLE_SECURITY
36 #endif
37 
38 namespace dtn
39 {
40  namespace api
41  {
43  : ProtocolHandler(client, stream), _sender(*this), _streambuf(*this), _bundlestream(&_streambuf), _group(true), _lifetime(3600)
44  {
45  _endpoint = client.getRegistration().getDefaultEID();
46  }
47 
49  {
50  _sender.stop();
51  _sender.join();
52  }
53 
55  {
57  }
58 
60  {
61  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 20) << "put()" << IBRCOMMON_LOGGER_ENDL;
62 
63  // set destination EID
64  b.destination = _peer;
65 
66  // set source
67  b.source = _endpoint;
68 
69  // set lifetime
70  b.lifetime = _lifetime;
71 
72  // set flag if the bundles are addresses to a group
73  if (_group)
74  {
76  }
77  else
78  {
80  }
81 
82  // raise default bundle received event
84  }
85 
87  {
89  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 20) << "get()" << IBRCOMMON_LOGGER_ENDL;
90 
91  dtn::data::MetaBundle bundle;
92 
93  while (true)
94  {
95  try {
96  bundle = reg.receiveMetaBundle();
97 
98  // discard bundle if they are not from the specified peer
99  if ((!_group) && (bundle.source != _peer))
100  {
101  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 30) << "get(): bundle source " << bundle.source.getString() << " not expected - discard" << IBRCOMMON_LOGGER_ENDL;
102  continue;
103  }
104 
105  break;
106  } catch (const dtn::storage::NoBundleFoundException&) {
107  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 30) << "get(): no bundle found wait for notify" << IBRCOMMON_LOGGER_ENDL;
108  reg.wait_for_bundle(timeout);
109  }
110  }
111 
112  return bundle;
113  }
114 
116  {
117  // close the stream
118  _stream.close();
119  }
120 
122  {
123  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 60) << "OrderedStreamHandler down" << IBRCOMMON_LOGGER_ENDL;
124 
126 
127  // close the stream
128  _stream.close();
129 
130  try {
131  // shutdown the sender thread
132  _sender.stop();
133  } catch (const std::exception&) { };
134  }
135 
137  {
138  std::string buffer;
139  _stream << ClientHandler::API_STATUS_OK << " SWITCHED TO ORDERED STREAM PROTOCOL" << std::endl;
140 
141  while (_stream.good())
142  {
143  getline(_stream, buffer);
144 
145  std::string::reverse_iterator iter = buffer.rbegin();
146  if ( (*iter) == '\r' ) buffer = buffer.substr(0, buffer.length() - 1);
147 
148  std::vector<std::string> cmd = dtn::utils::Utils::tokenize(" ", buffer);
149  if (cmd.empty()) continue;
150 
151  try {
152  if (cmd[0] == "connect")
153  {
154  _stream << ClientHandler::API_STATUS_CONTINUE << " CONNECTION ESTABLISHED" << std::endl;
155 
156  // start sender to transfer received payload to the client
157  _sender.start();
158 
159  // forward data to stream buffer
160  _bundlestream << _stream.rdbuf() << std::flush;
161  }
162  else if (cmd[0] == "set")
163  {
164  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
165 
166  if (cmd[1] == "endpoint")
167  {
168  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
169 
170  _endpoint = dtn::core::BundleCore::local.add( dtn::core::BundleCore::local.getDelimiter() + cmd[2] );
171 
172  // error checking
173  if (_endpoint == dtn::data::EID())
174  {
175  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl;
176  _endpoint = dtn::core::BundleCore::local;
177  }
178  else
179  {
180  _client.getRegistration().subscribe(_endpoint);
181  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
182  }
183  }
184  else if (cmd[1] == "destination")
185  {
186  _peer = cmd[2];
187  _group = false;
188  _stream << ClientHandler::API_STATUS_OK << " DESTINATION CHANGED" << std::endl;
189  }
190  else if (cmd[1] == "group")
191  {
192  _peer = cmd[2];
193  _group = true;
194  _stream << ClientHandler::API_STATUS_OK << " DESTINATION GROUP CHANGED" << std::endl;
195  }
196  else if (cmd[1] == "lifetime")
197  {
198  std::stringstream ss(cmd[2]);
199  _lifetime.read(ss);
200  _stream << ClientHandler::API_STATUS_OK << " LIFETIME CHANGED" << std::endl;
201  }
202  else if (cmd[1] == "chunksize")
203  {
204  size_t size = 0;
205  std::stringstream ss(cmd[2]);
206  ss >> size;
207  _streambuf.setChunkSize(size);
208  _stream << ClientHandler::API_STATUS_OK << " CHUNKSIZE CHANGED" << std::endl;
209  }
210  else if (cmd[1] == "timeout")
211  {
212  size_t timeout = 0;
213  std::stringstream ss(cmd[2]);
214  ss >> timeout;
215  _streambuf.setTimeout(timeout);
216  _stream << ClientHandler::API_STATUS_OK << " TIMEOUT CHANGED" << std::endl;
217  }
218  else
219  {
220  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
221  }
222  }
223  else
224  {
225  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
226  }
227  } catch (const std::exception&) {
228  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " ERROR" << std::endl;
229  }
230  }
231  }
232 
233  OrderedStreamHandler::Sender::Sender(OrderedStreamHandler &conn)
234  : _handler(conn)
235  {
236  }
237 
238  OrderedStreamHandler::Sender::~Sender()
239  {
241  }
242 
243  void OrderedStreamHandler::Sender::__cancellation() throw ()
244  {
245  // cancel the main thread in here
246  _handler._client.getRegistration().abort();
247  }
248 
249  void OrderedStreamHandler::Sender::finally() throw ()
250  {
251  _handler._client.getRegistration().abort();
252  }
253 
254  void OrderedStreamHandler::Sender::run() throw ()
255  {
256  try {
257  _handler._stream << _handler._bundlestream.rdbuf() << std::flush;
258  } catch (const std::exception &ex) {
259  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL;
260  }
261  }
262  } /* namespace api */
263 } /* namespace dtn */