40 const std::string DatagramConvergenceLayer::TAG =
"DatagramConvergenceLayer";
43 : _service(ds), _receiver(*this),
_running(false),
44 _stats_in(0), _stats_out(0), _stats_rtt(0.0), _stats_retries(0), _stats_failure(0)
56 while (_connections.size() > 0) _cond_connections.
wait();
70 NodeGone *gone =
new NodeGone();
71 gone->eid =
event.getNode().getEID();
72 _action_queue.push(gone);
74 }
catch (
const std::bad_cast&) { };
88 std::stringstream ss_format;
97 ss_format << _stats_in;
98 data[IN_TAG] = ss_format.str();
101 ss_format << _stats_out;
102 data[OUT_TAG] = ss_format.str();
105 ss_format << _stats_rtt;
106 data[RTT_TAG] = ss_format.str();
109 ss_format << _stats_retries;
110 data[RETRIES_TAG] = ss_format.str();
113 ss_format << _stats_failure;
114 data[FAIL_TAG] = ss_format.str();
128 _service->send(HEADER_SEGMENT, flags, seqno, destination, buf, len);
140 _service->send(HEADER_ACK, 0, seqno, destination, NULL, 0);
149 _service->send(HEADER_NACK, 0, seqno, destination, NULL, 0);
155 if (!_running)
return;
157 const std::list<dtn::core::Node::URI> uri_list = node.
get(_service->
getProtocol());
158 if (uri_list.empty())
return;
166 QueueBundle *
queue =
new QueueBundle(job);
167 queue->uri = uri.
value;
169 _action_queue.
push( queue );
177 for(connection_list::const_iterator i = _connections.begin(); i != _connections.end(); ++i)
179 if ((*i)->getIdentifier() == identifier)
194 _connections.push_back(connection);
197 _cond_connections.signal(
true);
208 _stats_retries += retries;
223 ConnectionDown *cd =
new ConnectionDown();
225 _action_queue.
push(cd);
233 }
catch (
const std::exception &e) {
255 _action_queue.
push(
new Shutdown());
261 if (iface != _service->getInterface())
return;
267 std::streamsize len = ss.str().size();
274 _service->send(HEADER_BROADCAST, 0, 0, ss.str().c_str(),
static_cast<dtn::data::Length>(len));
284 unsigned int seqno = 0;
287 std::vector<char> data(maxlen);
297 len = _service->
recvfrom(&data[0], maxlen, type, flags, seqno, address);
320 ss.write(&data[0], len);
326 if (beacon.isShort())
335 BeaconReceived *bc =
new BeaconReceived();
336 bc->address = address;
338 _action_queue.
push(bc);
347 SegmentReceived *seg =
new SegmentReceived(maxlen);
348 seg->address = address;
353 _action_queue.
push(seg);
357 AckReceived *ack =
new AckReceived();
358 ack->address = address;
360 _action_queue.
push(ack);
364 NackReceived *nack =
new NackReceived();
365 nack->address = address;
368 _action_queue.
push(nack);
385 while (_running || (_connections.size() > 0))
387 Action *action = _action_queue.
getnpop(
true);
392 AckReceived &ack =
dynamic_cast<AckReceived&
>(*action);
401 connection.
ack(ack.seqno);
402 }
catch (
const ConnectionNotAvailableException &ex) {
405 }
catch (
const std::bad_cast&) { };
408 NackReceived &nack =
dynamic_cast<NackReceived&
>(*action);
418 connection.
nack(nack.seqno, nack.temporary);
419 }
catch (
const ConnectionNotAvailableException &ex) {
422 }
catch (
const std::bad_cast&) { };
425 SegmentReceived &segment =
dynamic_cast<SegmentReceived&
>(*action);
432 connection.
queue(segment.flags, segment.seqno, &segment.data[0], segment.len);
437 }
catch (
const std::bad_cast&) { };
440 BeaconReceived &beacon =
dynamic_cast<BeaconReceived&
>(*action);
448 }
catch (
const std::bad_cast&) { };
451 ConnectionDown &cd =
dynamic_cast<ConnectionDown&
>(*action);
454 for (connection_list::iterator i = _connections.begin(); i != _connections.end(); ++i)
456 if ((*i)->getIdentifier() == cd.id)
459 _connections.erase(i);
465 _cond_connections.
signal(
true);
469 }
catch (
const std::bad_cast&) { };
472 NodeGone &gone =
dynamic_cast<NodeGone&
>(*action);
474 for (connection_list::iterator i = _connections.begin(); i != _connections.end(); ++i)
476 if ((*i)->getPeerEID() == gone.eid)
483 }
catch (
const std::bad_cast&) { };
486 QueueBundle &
queue =
dynamic_cast<QueueBundle&
>(*action);
492 conn.
queue(queue.job);
493 }
catch (
const std::bad_cast&) { };
496 dynamic_cast<Shutdown&
>(*action);
499 for(connection_list::const_iterator i = _connections.begin(); i != _connections.end(); ++i)
505 }
catch (
const std::bad_cast&) { };
526 return DatagramConvergenceLayer::TAG;
534 DatagramConvergenceLayer::Receiver::~Receiver()
541 if (JoinableThread::isFinalized()) JoinableThread::reset();
544 void DatagramConvergenceLayer::Receiver::run() throw ()
549 void DatagramConvergenceLayer::Receiver::__cancellation() throw ()