From 8e2ca75849ce26b8be17d8d10778a30192e5ddee Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Tue, 15 Dec 2020 23:12:43 -0800 Subject: [PATCH] reset link credit with drain feature --- inc/azure_uamqp_c/link.h | 1 + src/link.c | 68 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/inc/azure_uamqp_c/link.h b/inc/azure_uamqp_c/link.h index 4037fb98..d93a79b9 100644 --- a/inc/azure_uamqp_c/link.h +++ b/inc/azure_uamqp_c/link.h @@ -75,6 +75,7 @@ MOCKABLE_FUNCTION(, int, link_attach, LINK_HANDLE, link, ON_TRANSFER_RECEIVED, o MOCKABLE_FUNCTION(, int, link_detach, LINK_HANDLE, link, bool, close, const char*, error_condition, const char*, error_description, AMQP_VALUE, info); MOCKABLE_FUNCTION(, ASYNC_OPERATION_HANDLE, link_transfer_async, LINK_HANDLE, handle, message_format, message_format, PAYLOAD*, payloads, size_t, payload_count, ON_DELIVERY_SETTLED, on_delivery_settled, void*, callback_context, LINK_TRANSFER_RESULT*, link_transfer_result,tickcounter_ms_t, timeout); MOCKABLE_FUNCTION(, void, link_dowork, LINK_HANDLE, link); +MOCKABLE_FUNCTION(, int, link_reset_link_credit, LINK_HANDLE, link, uint32_t, link_credit, bool, drain); MOCKABLE_FUNCTION(, ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE, link_subscribe_on_link_detach_received, LINK_HANDLE, link, ON_LINK_DETACH_RECEIVED, on_link_detach_received, void*, context); MOCKABLE_FUNCTION(, void, link_unsubscribe_on_link_detach_received, ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE, event_subscription); diff --git a/src/link.c b/src/link.c index 95b24fd0..fbda2228 100644 --- a/src/link.c +++ b/src/link.c @@ -1151,6 +1151,74 @@ int link_set_max_link_credit(LINK_HANDLE link, uint32_t max_link_credit) return result; } +int link_reset_link_credit(LINK_HANDLE link, uint32_t link_credit, bool drain) +{ + int result; + FLOW_HANDLE flow; + LINK_INSTANCE* link_instance = (LINK_INSTANCE*)link; + + if (link == NULL) + { + result = MU_FAILURE; + } + else + { + if(link_instance->role == role_sender) + { + LogError("Sender is not allowed to reset link credit"); + result = MU_FAILURE; + } + else + { + link->current_link_credit = link_credit; + + flow = flow_create(0, 0, 0); + if (flow == NULL) + { + LogError("NULL flow performative"); + result = MU_FAILURE; + } + else + { + if (flow_set_link_credit(flow, link->current_link_credit) != 0) + { + LogError("Cannot set link credit on flow performative"); + result = MU_FAILURE; + } + else if (flow_set_handle(flow, link->handle) != 0) + { + LogError("Cannot set handle on flow performative"); + result = MU_FAILURE; + } + else if (flow_set_delivery_count(flow, link->delivery_count) != 0) + { + LogError("Cannot set delivery count on flow performative"); + result = MU_FAILURE; + } + else if (drain && flow_set_drain(flow, drain) != 0) + { + LogError("Cannot set drain on flow performative"); + result = MU_FAILURE; + } + else + { + if (session_send_flow(link->link_endpoint, flow) != 0) + { + LogError("Sending flow frame failed in session send"); + result = MU_FAILURE; + } + else + { + result = 0; + } + } + flow_destroy(flow); + } + } + } + return result; +} + int link_attach(LINK_HANDLE link, ON_TRANSFER_RECEIVED on_transfer_received, ON_LINK_STATE_CHANGED on_link_state_changed, ON_LINK_FLOW_ON on_link_flow_on, void* callback_context) { int result;