IBR-DTNSuite  0.12
BinaryStreamClient.cpp
Go to the documentation of this file.
1 /*
2  * BinaryStreamClient.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 #include "Configuration.h"
24 #include "api/BinaryStreamClient.h"
25 #include "core/GlobalEvent.h"
26 #include "core/BundleCore.h"
27 #include "core/BundleEvent.h"
29 #include <ibrdtn/data/Serializer.h>
30 #include <iostream>
31 #include <ibrcommon/Logger.h>
32 
33 namespace dtn
34 {
35  namespace api
36  {
38  : ProtocolHandler(client, stream), _sender(*this), _connection(*this, _stream), _lastack(0)
39  {
40  }
41 
43  {
45  _sender.join();
46  }
47 
49  {
50  return _eid;
51  }
52 
54  {
55  }
56 
58  {
59  }
60 
62  {
63  }
64 
66  {
67  Registration &reg = _client.getRegistration();
68 
69  if (header._localeid.isNone())
70  {
71  // create an EID based on the registration handle
72  _eid = reg.getDefaultEID();
73  }
74  else
75  {
76  // contact received event
77  _eid = BundleCore::local;
78  _eid.setApplication( header._localeid.getSSP() );
79  }
80 
81  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 20) << "new client connected, handle: " << reg.getHandle() << "; eid: "<< _eid.getString() << IBRCOMMON_LOGGER_ENDL;
82 
83  reg.subscribe(_eid);
84  }
85 
87  {
88  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 40) << "BinaryStreamClient::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
89 
91 
92  try {
93  // stop the sender
94  _sender.stop();
95  } catch (const ibrcommon::ThreadException &ex) {
96  IBRCOMMON_LOGGER_TAG("BinaryStreamClient", error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
97  }
98  }
99 
101  {
102  try {
103  const dtn::data::Bundle bundle = _sentqueue.getnpop();
104 
105  // set ACK to zero
106  _lastack = 0;
107 
108  } catch (const ibrcommon::QueueUnblockedException&) {
109  // pop on empty queue!
110  }
111  }
112 
114  {
115  try {
116  const dtn::data::Bundle bundle = _sentqueue.getnpop();
118 
119  // notify bundle as delivered
121 
122  // set ACK to zero
123  _lastack = 0;
124  } catch (const ibrcommon::QueueUnblockedException&) {
125  // pop on empty queue!
126  }
127  }
128 
130  {
131  _lastack = ack;
132  }
133 
135  {
136  // shutdown
138 
139  // close the stream
140  _stream.close();
141  }
142 
144  {
145  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 60) << "BinaryStreamClient down" << IBRCOMMON_LOGGER_ENDL;
146 
147  // abort blocking registrations
149 
150  // close the stream
151  _stream.close();
152 
153  try {
154  // shutdown the sender thread
155  _sender.stop();
156  } catch (const std::exception&) { };
157  }
158 
160  {
161  try {
162  char flags = 0;
163 
164  // request acknowledgements
166 
167  // do the handshake
168  _connection.handshake(dtn::core::BundleCore::local, 10, flags);
169 
170  // start the sender thread
171  _sender.start();
172 
173  while (_connection.good())
174  {
175  dtn::data::Bundle bundle;
176  dtn::data::DefaultDeserializer(_connection) >> bundle;
177 
178  // process the new bundle
180  }
181  } catch (const ibrcommon::ThreadException &ex) {
182  IBRCOMMON_LOGGER_TAG("BinaryStreamClient", error) << "failed to start thread: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
184  } catch (const dtn::SerializationFailedException &ex) {
185  IBRCOMMON_LOGGER_TAG("BinaryStreamClient", error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
187  } catch (const ibrcommon::IOException &ex) {
188  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
190  } catch (const dtn::InvalidDataException &ex) {
191  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
193  } catch (const std::exception &ex) {
194  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
196  }
197  }
198 
200  {
201  return _stream.good();
202  }
203 
204  BinaryStreamClient::Sender::Sender(BinaryStreamClient &client)
205  : _client(client)
206  {
207  }
208 
209  BinaryStreamClient::Sender::~Sender()
210  {
212  }
213 
214  void BinaryStreamClient::Sender::__cancellation() throw ()
215  {
216  // cancel the main thread in here
217  this->abort();
218 
219  // abort all blocking calls on the registration object
220  _client._client.getRegistration().abort();
221  }
222 
223  void BinaryStreamClient::Sender::run() throw ()
224  {
225  Registration &reg = _client._client.getRegistration();
226 
227  try {
228  while (_client.good())
229  {
230  try {
231  dtn::data::Bundle bundle = reg.receive();
232 
233  // process the bundle block (security, compression, ...)
235 
236  // add bundle to the queue
237  _client._sentqueue.push(bundle);
238 
239  // transmit the bundle
240  dtn::data::DefaultSerializer(_client._connection) << bundle;
241 
242  // mark the end of the bundle
243  _client._connection << std::flush;
244  } catch (const dtn::storage::NoBundleFoundException&) {
245  reg.wait_for_bundle();
246  }
247 
248  // idle a little bit
249  yield();
250  }
251  } catch (const ibrcommon::QueueUnblockedException &ex) {
252  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 40) << ex.what() << IBRCOMMON_LOGGER_ENDL;
253  return;
254  } catch (const ibrcommon::IOException &ex) {
255  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
256  } catch (const dtn::InvalidDataException &ex) {
257  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
258  } catch (const std::exception &ex) {
259  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL;
260  }
261  }
262 
264  {
265  _sender.push(bundle);
266  }
267  }
268 }