37 struct blocked_neighbour_t {
38 struct blocked_neighbour_t *
next;
44 clock_time_t timestamp;
50 LIST(transmission_ticket_list);
56 LIST(blocked_neighbour_list);
63 int convergence_layer_set_blocked(rimeaddr_t * neighbour);
64 int convergence_layer_set_unblocked(rimeaddr_t * neighbour);
69 PROCESS(convergence_layer_process,
"CL process");
101 uint8_t convergence_layer_backoff_pending = 0;
103 int convergence_layer_init(
void)
111 struct transmit_ticket_t * convergence_layer_get_transmit_ticket_priority(uint8_t priority)
117 LOG(LOGD_DTN, LOG_CL, LOGL_WRN,
"Cannot allocate ticket with low priority");
121 LOG(LOGD_DTN, LOG_CL, LOGL_DBG,
"Requested ticket with priority %d", priority);
124 ticket =
memb_alloc(&transmission_ticket_mem);
125 if( ticket ==
NULL ) {
126 LOG(LOGD_DTN, LOG_CL, LOGL_WRN,
"Cannot allocate ticket");
137 list_add(transmission_ticket_list, ticket);
140 list_push(transmission_ticket_list, ticket);
157 if( ticket->bundle !=
NULL ) {
159 ticket->bundle =
NULL;
162 LOG(LOGD_DTN, LOG_CL, LOGL_DBG,
"Freeing ticket %p", ticket);
165 if( (ticket->flags &
CONVERGENCE_LAYER_QUEUE_ACTIVE) || (ticket->flags & CONVERGENCE_LAYER_QUEUE_DONE) || (ticket->flags & CONVERGENCE_LAYER_QUEUE_FAIL) ) {
175 memb_free(&transmission_ticket_mem, ticket);
182 if( ticket ==
NULL ) {
183 LOG(LOGD_DTN, LOG_CL, LOGL_WRN,
"Cannot enqueue invalid ticket %p", ticket);
187 LOG(LOGD_DTN, LOG_CL, LOGL_DBG,
"Enqueuing bundle %lu to %u.%u, queue is at %u entries", ticket->bundle_number, ticket->neighbour.u8[0], ticket->neighbour.u8[1],
convergence_layer_queue);
206 uint8_t * buffer =
NULL;
207 uint8_t buffer_length = 0;
209 LOG(LOGD_DTN, LOG_CL, LOGL_DBG,
"Sending bundle %lu to %u.%u with ticket %p", ticket->bundle_number, ticket->neighbour.u8[0], ticket->neighbour.u8[1], ticket);
212 if( ticket->bundle ==
NULL ) {
213 ticket->bundle =
BUNDLE_STORAGE.read_bundle(ticket->bundle_number);
214 if( ticket->bundle ==
NULL ) {
215 LOG(LOGD_DTN, LOG_CL, LOGL_ERR,
"Unable to read bundle %lu", ticket->bundle_number);
223 if( bundle ==
NULL ) {
224 LOG(LOGD_DTN, LOG_CL, LOGL_ERR,
"Invalid bundle pointer for bundle %lu", ticket->bundle_number);
226 ticket->bundle =
NULL;
231 if( bundle->lifetime == 0 ) {
232 LOG(LOGD_DTN, LOG_CL, LOGL_INF,
"Bundle %lu has expired, not sending it", ticket->bundle_number);
238 BUNDLE_STORAGE.del_bundle(ticket->bundle_number, REASON_LIFETIME_EXPIRED);
245 if( buffer ==
NULL ) {
247 ticket->bundle =
NULL;
268 ticket->flags = CONVERGENCE_LAYER_QUEUE_FAIL;
271 ROUTING.sent(ticket, ROUTING_STATUS_ERROR);
277 ticket->flags |= CONVERGENCE_LAYER_QUEUE_IN_TRANSIT;
283 convergence_layer_set_blocked(&ticket->neighbour);
291 int convergence_layer_send_discovery(uint8_t * payload, uint8_t length, rimeaddr_t * neighbour)
293 uint8_t * buffer =
NULL;
295 LOG(LOGD_DTN, LOG_CL, LOGL_DBG,
"Sending discovery to %u.%u", neighbour->u8[0], neighbour->u8[1]);
304 if( buffer ==
NULL ) {
309 buffer[0] = CONVERGENCE_LAYER_TYPE_DISCOVERY;
312 memcpy(buffer + 1, payload, length);
323 int convergence_layer_send_ack(rimeaddr_t * destination, uint8_t sequence_number, uint8_t type,
struct transmit_ticket_t * ticket)
325 uint8_t * buffer =
NULL;
327 LOG(LOGD_DTN, LOG_CL, LOGL_DBG,
"Sending ACK or NACK to %u.%u for SeqNo %u with ticket %p", destination->u8[0], destination->u8[1], sequence_number, ticket);
332 ticket->timestamp = 0;
339 if( buffer ==
NULL ) {
343 if( type == CONVERGENCE_LAYER_TYPE_ACK ) {
346 buffer[0] |= CONVERGENCE_LAYER_TYPE_ACK & CONVERGENCE_LAYER_MASK_TYPE;
347 buffer[0] |= (sequence_number << 2) & CONVERGENCE_LAYER_MASK_SEQNO;
348 }
else if( type == CONVERGENCE_LAYER_TYPE_NACK ) {
351 buffer[0] |= CONVERGENCE_LAYER_TYPE_NACK & CONVERGENCE_LAYER_MASK_TYPE;
352 buffer[0] |= (sequence_number << 2) & CONVERGENCE_LAYER_MASK_SEQNO;
367 int convergence_layer_create_send_ack(rimeaddr_t * destination, uint8_t sequence_number, uint8_t type)
372 ticket = convergence_layer_get_transmit_ticket_priority(CONVERGENCE_LAYER_PRIORITY_HIGH);
373 if( ticket ==
NULL ) {
374 LOG(LOGD_DTN, LOG_CL, LOGL_WRN,
"Unable to allocate ticket to potentially retransmit ACK/NACK");
377 ticket->sequence_number = sequence_number;
378 ticket->flags |= CONVERGENCE_LAYER_QUEUE_IN_TRANSIT;
380 if( type == CONVERGENCE_LAYER_TYPE_ACK ) {
381 ticket->flags |= CONVERGENCE_LAYER_QUEUE_ACK;
383 ticket->flags |= CONVERGENCE_LAYER_QUEUE_NACK;
387 return convergence_layer_send_ack(destination, sequence_number, type, ticket);
393 if( (!(ticket->flags & CONVERGENCE_LAYER_QUEUE_ACK) && !(ticket->flags & CONVERGENCE_LAYER_QUEUE_NACK)) || (ticket->flags & CONVERGENCE_LAYER_QUEUE_IN_TRANSIT) ) {
402 ticket->flags |= CONVERGENCE_LAYER_QUEUE_IN_TRANSIT;
404 if( ticket->flags & CONVERGENCE_LAYER_QUEUE_ACK ) {
405 type = CONVERGENCE_LAYER_TYPE_ACK;
406 }
else if( ticket->flags & CONVERGENCE_LAYER_QUEUE_NACK ) {
407 type = CONVERGENCE_LAYER_TYPE_NACK;
409 LOG(LOGD_DTN, LOG_CL, LOGL_ERR,
"Unknown control packet type");
413 convergence_layer_send_ack(&ticket->neighbour, ticket->sequence_number, type, ticket);
418 int convergence_layer_parse_dataframe(rimeaddr_t * source, uint8_t * payload, uint8_t length, uint8_t
flags, uint8_t sequence_number, packetbuf_attr_t rssi)
420 struct mmem * bundlemem =
NULL;
425 LOG(LOGD_DTN, LOG_CL, LOGL_ERR,
"Bundle received %p from %u.%u with invalid flags %02X", payload, source->u8[0], source->u8[1], flags);
431 LOG(LOGD_DTN, LOG_CL, LOGL_WRN,
"Error recovering bundle");
437 LOG(LOGD_DTN, LOG_CL, LOGL_WRN,
"Invalid bundle pointer");
443 bundle->source_process = &agent_process;
445 LOG(LOGD_DTN, LOG_CL, LOGL_DBG,
"Bundle %lu received from %u.%u with SeqNo %u", bundle->bundle_num, source->u8[0], source->u8[1], sequence_number);
459 convergence_layer_create_send_ack(source, sequence_number + 1, CONVERGENCE_LAYER_TYPE_ACK);
462 convergence_layer_create_send_ack(source, sequence_number, CONVERGENCE_LAYER_TYPE_NACK);
468 int convergence_layer_parse_ackframe(rimeaddr_t * source, uint8_t * payload, uint8_t length, uint8_t sequence_number, uint8_t type)
474 convergence_layer_set_unblocked(source);
481 LOG(LOGD_DTN, LOG_CL, LOGL_DBG,
"Incoming ACK from %u.%u for SeqNo %u", source->u8[0], source->u8[1], sequence_number);
483 for(ticket =
list_head(transmission_ticket_list);
486 if(
rimeaddr_cmp(source, &ticket->neighbour) && (ticket->flags & CONVERGENCE_LAYER_QUEUE_ACK_PEND) ) {
492 if( ticket ==
NULL ) {
497 if( type == CONVERGENCE_LAYER_TYPE_ACK && ticket->bundle !=
NULL ) {
501 if( bundle->flags & BUNDLE_FLAG_REP_FWD ) {
506 if( type == CONVERGENCE_LAYER_TYPE_ACK ) {
508 ticket->flags = CONVERGENCE_LAYER_QUEUE_DONE;
512 }
else if( type == CONVERGENCE_LAYER_TYPE_NACK ) {
514 ticket->flags = CONVERGENCE_LAYER_QUEUE_FAIL;
517 ROUTING.sent(ticket, ROUTING_STATUS_NACK);
521 if( ticket->bundle !=
NULL ) {
523 ticket->bundle =
NULL;
529 int convergence_layer_incoming_frame(rimeaddr_t * source, uint8_t * payload, uint8_t length, packetbuf_attr_t rssi)
531 uint8_t * data_pointer =
NULL;
532 uint8_t data_length = 0;
536 LOG(LOGD_DTN, LOG_CL, LOGL_DBG,
"Incoming frame from %u.%u", source->u8[0], source->u8[1]);
543 LOG(LOGD_DTN, LOG_CL, LOGL_INF,
"Ignoring incoming frame from %u.%u", source->u8[0], source->u8[1]);
548 data_pointer = payload + 1;
549 data_length = length - 1;
554 int sequence_number = 0;
556 flags = (header & CONVERGENCE_LAYER_MASK_FLAGS) >> 0;
557 sequence_number = (header & CONVERGENCE_LAYER_MASK_SEQNO) >> 2;
559 LOG(LOGD_DTN, LOG_CL, LOGL_DBG,
"Incoming data frame from %u.%u with SeqNo %u", source->u8[0], source->u8[1], sequence_number);
562 ret = convergence_layer_parse_dataframe(source, data_pointer, data_length, flags, sequence_number, rssi);
566 convergence_layer_create_send_ack(source, sequence_number, CONVERGENCE_LAYER_TYPE_NACK);
572 if( (header & CONVERGENCE_LAYER_MASK_TYPE) == CONVERGENCE_LAYER_TYPE_DISCOVERY ) {
574 LOG(LOGD_DTN, LOG_CL, LOGL_DBG,
"Incoming discovery frame from %u.%u", source->u8[0], source->u8[1]);
576 DISCOVERY.receive(source, data_pointer, data_length);
581 if( (header & CONVERGENCE_LAYER_MASK_TYPE) == CONVERGENCE_LAYER_TYPE_ACK ) {
583 int sequence_number = 0;
585 sequence_number = (header & CONVERGENCE_LAYER_MASK_SEQNO) >> 2;
587 LOG(LOGD_DTN, LOG_CL, LOGL_DBG,
"Incoming Ack frame from %u.%u with SeqNo %u", source->u8[0], source->u8[1], sequence_number);
589 convergence_layer_parse_ackframe(source, data_pointer, data_length, sequence_number, CONVERGENCE_LAYER_TYPE_ACK);
594 if( (header & CONVERGENCE_LAYER_MASK_TYPE) == CONVERGENCE_LAYER_TYPE_NACK ) {
596 int sequence_number = 0;
598 sequence_number = (header & CONVERGENCE_LAYER_MASK_SEQNO) >> 2;
600 LOG(LOGD_DTN, LOG_CL, LOGL_DBG,
"Incoming Nack frame from %u.%u with SeqNo %u", source->u8[0], source->u8[1], sequence_number);
602 convergence_layer_parse_ackframe(source, data_pointer, data_length, sequence_number, CONVERGENCE_LAYER_TYPE_NACK);
610 int convergence_layer_status(
void * pointer, uint8_t outcome)
614 LOG(LOGD_DTN, LOG_CL, LOGL_DBG,
"MAC callback for %p with %d", pointer, outcome);
621 if( outcome == CONVERGENCE_LAYER_STATUS_NOSEND ) {
624 convergence_layer_backoff_pending = 1;
633 if( pointer ==
NULL ) {
640 if( (ticket->flags & CONVERGENCE_LAYER_QUEUE_ACK) || (ticket->flags & CONVERGENCE_LAYER_QUEUE_NACK) ) {
642 ticket->flags &= ~CONVERGENCE_LAYER_QUEUE_IN_TRANSIT;
647 convergence_layer_free_transmit_ticket(ticket);
653 if( outcome == CONVERGENCE_LAYER_STATUS_FATAL ) {
654 convergence_layer_free_transmit_ticket(ticket);
658 if( outcome == CONVERGENCE_LAYER_STATUS_NOSEND ) {
660 ticket->timestamp = 0;
661 ticket->failed_tries ++;
669 LOG(LOGD_DTN, LOG_CL, LOGL_WRN,
"CL: Giving up on ticket %p after %d (or %d) tries", ticket, ticket->tries, ticket->failed_tries);
670 convergence_layer_free_transmit_ticket(ticket);
683 LOG(LOGD_DTN, LOG_CL, LOGL_DBG,
"LL Ack received, waiting for App-layer ACK with SeqNo %u", ticket->sequence_number);
686 if( ticket->bundle !=
NULL ) {
688 ticket->bundle =
NULL;
695 if( outcome == CONVERGENCE_LAYER_STATUS_FATAL ) {
697 convergence_layer_set_unblocked(&ticket->neighbour);
700 ROUTING.sent(ticket, ROUTING_STATUS_ERROR);
706 convergence_layer_set_unblocked(&ticket->neighbour);
709 if( outcome == CONVERGENCE_LAYER_STATUS_NOACK ) {
711 }
else if( outcome == CONVERGENCE_LAYER_STATUS_NOSEND ) {
712 ticket->failed_tries ++;
717 ticket->flags = CONVERGENCE_LAYER_QUEUE_FAIL;
720 ROUTING.sent(ticket, ROUTING_STATUS_FAIL);
723 if( ticket->bundle !=
NULL ) {
725 ticket->bundle =
NULL;
741 LOG(LOGD_DTN, LOG_CL, LOGL_DBG,
"Deleting tickets for bundle %lu", bundle_number);
743 for(ticket =
list_head(transmission_ticket_list);
746 if( ticket->bundle_number == bundle_number ) {
752 if( ticket ==
NULL ) {
757 convergence_layer_free_transmit_ticket(ticket);
764 struct blocked_neighbour_t * n =
NULL;
766 for( n =
list_head(blocked_neighbour_list);
777 int convergence_layer_set_blocked(rimeaddr_t * neighbour)
779 struct blocked_neighbour_t * n =
NULL;
783 LOG(LOGD_DTN, LOG_CL, LOGL_ERR,
"Cannot allocate neighbour memory");
792 list_add(blocked_neighbour_list, n);
797 int convergence_layer_set_unblocked(rimeaddr_t * neighbour)
799 struct blocked_neighbour_t * n =
NULL;
801 for( n =
list_head(blocked_neighbour_list);
814 void check_blocked_neighbours() {
815 struct blocked_neighbour_t * n =
NULL;
818 for( n =
list_head(blocked_neighbour_list);
836 for( ticket =
list_head(transmission_ticket_list);
839 if(
rimeaddr_cmp(&ticket->neighbour, &n->neighbour) && (ticket->flags & CONVERGENCE_LAYER_QUEUE_ACK_PEND) ) {
845 convergence_layer_set_unblocked(&n->neighbour);
847 LOG(LOGD_DTN, LOG_CL, LOGL_WRN,
"Neighbour %u.%u stale, removing lock", n->neighbour.u8[0], n->neighbour.u8[1]);
850 if( ticket ==
NULL ) {
863 int convergence_layer_neighbour_down(rimeaddr_t * neighbour) {
867 if( neighbour ==
NULL ) {
875 for(ticket =
list_head(transmission_ticket_list);
881 ROUTING.sent(ticket, ROUTING_STATUS_FAIL);
893 convergence_layer_set_unblocked(neighbour);
901 static struct etimer stale_timer;
914 LOG(LOGD_DTN, LOG_CL, LOGL_INF,
"CL process is running");
929 check_blocked_neighbours();
939 convergence_layer_backoff_pending = 0;
952 for(ticket =
list_head(transmission_ticket_list);
955 if( ((ticket->flags & CONVERGENCE_LAYER_QUEUE_ACK) || (ticket->flags & CONVERGENCE_LAYER_QUEUE_NACK)) && !(ticket->flags & CONVERGENCE_LAYER_QUEUE_IN_TRANSIT) ) {
956 n = convergence_layer_resend_ack(ticket);
974 convergence_layer_send_bundle(ticket);