IBR-DTNSuite  0.10
BinaryStreamClient.cpp
Go to the documentation of this file.
1 /*
2  * BinaryStreamClient.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 #include "Configuration.h"
24 #include "api/BinaryStreamClient.h"
25 #include "core/GlobalEvent.h"
26 #include "core/BundleCore.h"
27 #include "core/BundleEvent.h"
29 #include <ibrdtn/data/Serializer.h>
30 #include <iostream>
31 #include <ibrcommon/Logger.h>
32 
33 namespace dtn
34 {
35  namespace api
36  {
38  : ProtocolHandler(client, stream), _sender(*this), _connection(*this, _stream), _lastack(0)
39  {
40  }
41 
43  {
45  _sender.join();
46  }
47 
49  {
50  return _eid;
51  }
52 
54  {
55  }
56 
58  {
59  }
60 
62  {
63  }
64 
66  {
67  Registration &reg = _client.getRegistration();
68 
69  if (header._localeid.isNone())
70  {
71  // create an EID based on the registration handle
72  _eid = reg.getDefaultEID();
73  }
74  else
75  {
76  // contact received event
77  _eid = BundleCore::local.add( BundleCore::local.getDelimiter() + header._localeid.getSSP() );
78  }
79 
80  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 20) << "new client connected, handle: " << reg.getHandle() << "; eid: "<< _eid.getString() << IBRCOMMON_LOGGER_ENDL;
81 
82  reg.subscribe(_eid);
83  }
84 
86  {
87  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 40) << "BinaryStreamClient::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
88 
90 
91  try {
92  // stop the sender
93  _sender.stop();
94  } catch (const ibrcommon::ThreadException &ex) {
95  IBRCOMMON_LOGGER_TAG("BinaryStreamClient", error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
96  }
97  }
98 
100  {
101  try {
102  const dtn::data::Bundle bundle = _sentqueue.getnpop();
103 
104  // set ACK to zero
105  _lastack = 0;
106 
107  } catch (const ibrcommon::QueueUnblockedException&) {
108  // pop on empty queue!
109  }
110  }
111 
113  {
114  try {
115  const dtn::data::Bundle bundle = _sentqueue.getnpop();
116 
117  // notify bundle as delivered
119 
120  // set ACK to zero
121  _lastack = 0;
122  } catch (const ibrcommon::QueueUnblockedException&) {
123  // pop on empty queue!
124  }
125  }
126 
128  {
129  _lastack = ack;
130  }
131 
133  {
134  // shutdown
136 
137  // close the stream
138  _stream.close();
139  }
140 
142  {
143  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 60) << "BinaryStreamClient down" << IBRCOMMON_LOGGER_ENDL;
144 
145  // abort blocking registrations
147 
148  // close the stream
149  _stream.close();
150 
151  try {
152  // shutdown the sender thread
153  _sender.stop();
154  } catch (const std::exception&) { };
155  }
156 
158  {
159  try {
160  char flags = 0;
161 
162  // request acknowledgements
164 
165  // do the handshake
166  _connection.handshake(dtn::core::BundleCore::local, 10, flags);
167 
168  // start the sender thread
169  _sender.start();
170 
171  while (_connection.good())
172  {
173  dtn::data::Bundle bundle;
174  dtn::data::DefaultDeserializer(_connection) >> bundle;
175 
176  // create a new sequence number
177  bundle.relabel();
178 
179  // process the new bundle
181  }
182  } catch (const ibrcommon::ThreadException &ex) {
183  IBRCOMMON_LOGGER_TAG("BinaryStreamClient", error) << "failed to start thread: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
185  } catch (const dtn::SerializationFailedException &ex) {
186  IBRCOMMON_LOGGER_TAG("BinaryStreamClient", error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
188  } catch (const ibrcommon::IOException &ex) {
189  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
191  } catch (const dtn::InvalidDataException &ex) {
192  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
194  } catch (const std::exception &ex) {
195  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
197  }
198  }
199 
201  {
202  return _stream.good();
203  }
204 
205  BinaryStreamClient::Sender::Sender(BinaryStreamClient &client)
206  : _client(client)
207  {
208  }
209 
210  BinaryStreamClient::Sender::~Sender()
211  {
213  }
214 
215  void BinaryStreamClient::Sender::__cancellation() throw ()
216  {
217  // cancel the main thread in here
218  this->abort();
219 
220  // abort all blocking calls on the registration object
221  _client._client.getRegistration().abort();
222  }
223 
224  void BinaryStreamClient::Sender::run() throw ()
225  {
226  Registration &reg = _client._client.getRegistration();
227 
228  try {
229  while (_client.good())
230  {
231  try {
232  dtn::data::Bundle bundle = reg.receive();
233 
234  // process the bundle block (security, compression, ...)
236 
237  // add bundle to the queue
238  _client._sentqueue.push(bundle);
239 
240  // transmit the bundle
241  dtn::data::DefaultSerializer(_client._connection) << bundle;
242 
243  // mark the end of the bundle
244  _client._connection << std::flush;
245  } catch (const dtn::storage::NoBundleFoundException&) {
246  reg.wait_for_bundle();
247  }
248 
249  // idle a little bit
250  yield();
251  }
252  } catch (const ibrcommon::QueueUnblockedException &ex) {
253  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 40) << ex.what() << IBRCOMMON_LOGGER_ENDL;
254  return;
255  } catch (const ibrcommon::IOException &ex) {
256  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
257  } catch (const dtn::InvalidDataException &ex) {
258  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
259  } catch (const std::exception &ex) {
260  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL;
261  }
262  }
263 
265  {
266  _sender.push(bundle);
267  }
268  }
269 }