IBR-DTNSuite  0.12
OrderedStreamHandler.cpp
Go to the documentation of this file.
1 /*
2  * OrderedStreamHandler.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
23 #include "core/BundleCore.h"
25 
27 #include <ibrdtn/utils/Utils.h>
28 #include <ibrcommon/Logger.h>
29 
30 #ifdef WITH_BUNDLE_SECURITY
32 #endif
33 
34 namespace dtn
35 {
36  namespace api
37  {
39  : ProtocolHandler(client, stream), _sender(*this), _streambuf(*this), _bundlestream(&_streambuf), _group(true), _lifetime(3600)
40  {
41  _endpoint = client.getRegistration().getDefaultEID();
42  }
43 
45  {
46  _sender.stop();
47  _sender.join();
48  }
49 
51  {
53  }
54 
56  {
57  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 20) << "put()" << IBRCOMMON_LOGGER_ENDL;
58 
59  // set destination EID
60  b.destination = _peer;
61 
62  // set source
63  b.source = _endpoint;
64 
65  // set lifetime
66  b.lifetime = _lifetime;
67 
68  // set flag if the bundles are addresses to a group
69  if (_group)
70  {
72  }
73  else
74  {
76  }
77 
78  // raise default bundle received event
80  }
81 
83  {
85  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 20) << "get()" << IBRCOMMON_LOGGER_ENDL;
86 
87  dtn::data::MetaBundle bundle;
88 
89  while (true)
90  {
91  try {
92  bundle = reg.receiveMetaBundle();
93 
94  // discard bundle if they are not from the specified peer
95  if ((!_group) && (bundle.source != _peer))
96  {
97  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 30) << "get(): bundle source " << bundle.source.getString() << " not expected - discard" << IBRCOMMON_LOGGER_ENDL;
98  continue;
99  }
100 
101  break;
102  } catch (const dtn::storage::NoBundleFoundException&) {
103  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 30) << "get(): no bundle found wait for notify" << IBRCOMMON_LOGGER_ENDL;
104  reg.wait_for_bundle(timeout);
105  }
106  }
107 
108  return bundle;
109  }
110 
112  {
113  // close the stream
114  _stream.close();
115  }
116 
118  {
119  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 60) << "OrderedStreamHandler down" << IBRCOMMON_LOGGER_ENDL;
120 
122 
123  // close the stream
124  _stream.close();
125 
126  try {
127  // shutdown the sender thread
128  _sender.stop();
129  } catch (const std::exception&) { };
130  }
131 
133  {
134  std::string buffer;
135  _stream << ClientHandler::API_STATUS_OK << " SWITCHED TO ORDERED STREAM PROTOCOL" << std::endl;
136 
137  while (_stream.good())
138  {
139  getline(_stream, buffer);
140 
141  std::string::reverse_iterator iter = buffer.rbegin();
142  if ( (*iter) == '\r' ) buffer = buffer.substr(0, buffer.length() - 1);
143 
144  std::vector<std::string> cmd = dtn::utils::Utils::tokenize(" ", buffer);
145  if (cmd.empty()) continue;
146 
147  try {
148  if (cmd[0] == "connect")
149  {
150  _stream << ClientHandler::API_STATUS_CONTINUE << " CONNECTION ESTABLISHED" << std::endl;
151 
152  // start sender to transfer received payload to the client
153  _sender.start();
154 
155  // forward data to stream buffer
156  _bundlestream << _stream.rdbuf() << std::flush;
157  }
158  else if (cmd[0] == "set")
159  {
160  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
161 
162  if (cmd[1] == "endpoint")
163  {
164  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
165 
166  // error checking
167  if (cmd[2].length() <= 0)
168  {
169  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl;
170  }
171  else
172  {
173  // unsubscribe from old endpoint
174  _client.getRegistration().unsubscribe(_endpoint);
175 
176  // set new application endpoint
177  _endpoint.setApplication(cmd[2]);
178 
179  // subscribe to new endpoint
180  _client.getRegistration().subscribe(_endpoint);
181 
182  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
183  }
184  }
185  else if (cmd[1] == "destination")
186  {
187  _peer = cmd[2];
188  _group = false;
189  _stream << ClientHandler::API_STATUS_OK << " DESTINATION CHANGED" << std::endl;
190  }
191  else if (cmd[1] == "group")
192  {
193  _peer = cmd[2];
194  _group = true;
195  _stream << ClientHandler::API_STATUS_OK << " DESTINATION GROUP CHANGED" << std::endl;
196  }
197  else if (cmd[1] == "lifetime")
198  {
199  std::stringstream ss(cmd[2]);
200  _lifetime.read(ss);
201  _stream << ClientHandler::API_STATUS_OK << " LIFETIME CHANGED" << std::endl;
202  }
203  else if (cmd[1] == "chunksize")
204  {
205  size_t size = 0;
206  std::stringstream ss(cmd[2]);
207  ss >> size;
208  _streambuf.setChunkSize(size);
209  _stream << ClientHandler::API_STATUS_OK << " CHUNKSIZE CHANGED" << std::endl;
210  }
211  else if (cmd[1] == "timeout")
212  {
213  size_t timeout = 0;
214  std::stringstream ss(cmd[2]);
215  ss >> timeout;
216  _streambuf.setTimeout(timeout);
217  _stream << ClientHandler::API_STATUS_OK << " TIMEOUT CHANGED" << std::endl;
218  }
219  else
220  {
221  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
222  }
223  }
224  else
225  {
226  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
227  }
228  } catch (const std::exception&) {
229  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " ERROR" << std::endl;
230  }
231  }
232  }
233 
234  OrderedStreamHandler::Sender::Sender(OrderedStreamHandler &conn)
235  : _handler(conn)
236  {
237  }
238 
239  OrderedStreamHandler::Sender::~Sender()
240  {
242  }
243 
244  void OrderedStreamHandler::Sender::__cancellation() throw ()
245  {
246  // cancel the main thread in here
247  _handler._client.getRegistration().abort();
248  }
249 
250  void OrderedStreamHandler::Sender::finally() throw ()
251  {
252  _handler._client.getRegistration().abort();
253  }
254 
255  void OrderedStreamHandler::Sender::run() throw ()
256  {
257  try {
258  _handler._stream << _handler._bundlestream.rdbuf() << std::flush;
259  } catch (const std::exception &ex) {
260  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL;
261  }
262  }
263  } /* namespace api */
264 } /* namespace dtn */