Skip to content

Commit

Permalink
Merge pull request #2283 from arunans23/template-new
Browse files Browse the repository at this point in the history
Improve invoke mediator to support connector response model
  • Loading branch information
arunans23 authored Jan 22, 2025
2 parents 766acc0 + 0f3ab20 commit c63a88b
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,9 @@ public enum ENDPOINT_TIMEOUT_TYPE { ENDPOINT_TIMEOUT, GLOBAL_TIMEOUT, HTTP_CONNE
public static final String OAUTH_TAG = "oauth";
public static final String SCATTER_MESSAGES = "SCATTER_MESSAGES";
public static final String CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER = "CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER";
public static final String RESPONSE_VARIABLE = "responseVariable";
public static final String OVERWRITE_BODY = "overwriteBody";
public static final String ORIGINAL_PAYLOAD = "ORIGINAL_PAYLOAD";

public static final String DEFAULT_ERROR_TYPE = "ANY";
public static final String ERROR_STATS_REPORTED = "ERROR_STATS_REPORTED";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.synapse.mediators.elementary.EnrichMediator;
import org.apache.synapse.mediators.elementary.Source;
import org.apache.synapse.mediators.elementary.Target;
import org.apache.synapse.util.CallMediatorEnrichUtil;
import org.apache.synapse.util.MediatorEnrichUtil;
import org.jaxen.JaxenException;

import javax.xml.namespace.QName;
Expand Down Expand Up @@ -110,7 +110,7 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
OMElement targetEle = elem.getFirstChildWithName(TARGET_Q);
if (targetEle != null) {
if (sourceEle == null) {
Source source = CallMediatorEnrichUtil.createSourceWithBody();
Source source = MediatorEnrichUtil.createSourceWithBody();
callMediator.setSourceAvailable(true);
callMediator.setSourceForOutboundPayload(source);
}
Expand Down Expand Up @@ -168,7 +168,7 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
private void populateSource(CallMediator callMediator, Source source, OMElement sourceEle) {
OMAttribute typeAttr = sourceEle.getAttribute(ATT_TYPE);
if (typeAttr != null && typeAttr.getAttributeValue() != null) {
source.setSourceType(CallMediatorEnrichUtil.convertTypeToInt(typeAttr.getAttributeValue()));
source.setSourceType(MediatorEnrichUtil.convertTypeToInt(typeAttr.getAttributeValue()));
}

OMAttribute contentTypeAtt = sourceEle.getAttribute(CONTENT_TYPE);
Expand Down Expand Up @@ -220,7 +220,7 @@ private void populateTarget(CallMediator callMediator, Target target, OMElement
OMAttribute typeAttr = sourceEle.getAttribute(ATT_TYPE);
target.setAction("replace");
if (typeAttr != null && typeAttr.getAttributeValue() != null) {
int type = CallMediatorEnrichUtil.convertTypeToInt(typeAttr.getAttributeValue());
int type = MediatorEnrichUtil.convertTypeToInt(typeAttr.getAttributeValue());
if (type >= 0) {
target.setTargetType(type);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import org.apache.synapse.transport.util.MessageHandlerProvider;
import org.apache.synapse.transport.customlogsetter.CustomLogSetter;
import org.apache.synapse.unittest.UnitTestingExecutor;
import org.apache.synapse.util.CallMediatorEnrichUtil;
import org.apache.synapse.util.MediatorEnrichUtil;
import org.apache.synapse.util.concurrent.InboundThreadPool;
import org.apache.synapse.util.concurrent.SynapseThreadPool;
import org.apache.synapse.util.logging.LoggingUtils;
Expand Down Expand Up @@ -879,26 +879,26 @@ private void callMediatorPostMediate(MessageContext response) {
Target targetForResponsePayload;

if (isTargetAvailable) {
CallMediatorEnrichUtil.buildMessage(response);
MediatorEnrichUtil.buildMessage(response);
}
if (isTargetAvailable && isSourceAvailable) {
sourceForResponsePayload = CallMediatorEnrichUtil.createSourceWithBody();
sourceForOriginalPayload = CallMediatorEnrichUtil.createSourceWithProperty(INTERMEDIATE_ORIGINAL_BODY);
targetForResponsePayload = CallMediatorEnrichUtil.createTargetWithBody();
CallMediatorEnrichUtil
sourceForResponsePayload = MediatorEnrichUtil.createSourceWithBody();
sourceForOriginalPayload = MediatorEnrichUtil.createSourceWithProperty(INTERMEDIATE_ORIGINAL_BODY);
targetForResponsePayload = MediatorEnrichUtil.createTargetWithBody();
MediatorEnrichUtil
.doEnrich(response, sourceForResponsePayload, targetForInboundPayload, sourceMessageType);
CallMediatorEnrichUtil
MediatorEnrichUtil
.doEnrich(response, sourceForOriginalPayload, targetForResponsePayload, originalMessageType);
CallMediatorEnrichUtil.preservetransportHeaders(response, originalTransportHeaders);
MediatorEnrichUtil.preservetransportHeaders(response, originalTransportHeaders);
if (!sourceMessageType.equalsIgnoreCase(originalMessageType)) {
CallMediatorEnrichUtil.setContentType(response, originalMessageType, originalContentType);
MediatorEnrichUtil.setContentType(response, originalMessageType, originalContentType);
if (sourceMessageType.equalsIgnoreCase(JSON_TYPE)) {
JsonUtil.removeJsonStream(((Axis2MessageContext) response).getAxis2MessageContext());
}
}
} else if (isTargetAvailable) {
sourceForResponsePayload = CallMediatorEnrichUtil.createSourceWithBody();
CallMediatorEnrichUtil
sourceForResponsePayload = MediatorEnrichUtil.createSourceWithBody();
MediatorEnrichUtil
.doEnrich(response, sourceForResponsePayload, targetForInboundPayload, sourceMessageType);
}
response.setProperty(IS_SOURCE_AVAILABLE, false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2025, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* you may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.synapse.data.connector;

import java.util.Map;

public interface ConnectorResponse {

void setPayload(Object payload);

Object getPayload();

void setHeaders(Map<String, Object> headers);

Map<String, Object> getHeaders();

void setAttributes(Map<String, Object> attributes);

Map<String, Object> getAttributes();

void addAttribute(String key, Object value);

void removeAttribute(String key);

void addHeader(String key, String value);

void removeHeader(String key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2025, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* you may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.synapse.data.connector;

import java.util.HashMap;
import java.util.Map;

public class DefaultConnectorResponse implements ConnectorResponse {

private Object payload;
private Map<String, Object> headers = new HashMap<>();
private Map<String, Object> attributes = new HashMap<>();

@Override
public void setPayload(Object payload) {
this.payload = payload;
}

@Override
public Object getPayload() {
return payload;
}

@Override
public void setHeaders(Map<String, Object> headers) {
this.headers = headers;
}

@Override
public Map<String, Object> getHeaders() {
return headers;
}

@Override
public void setAttributes(Map<String, Object> attributes) {
this.attributes = attributes;
}

@Override
public Map<String, Object> getAttributes() {
return attributes;
}

@Override
public void addAttribute(String key, Object value) {
attributes.put(key, value);
}

@Override
public void removeAttribute(String key) {
attributes.remove(key);
}

@Override
public void addHeader(String key, String value) {
headers.put(key, value);
}

@Override
public void removeHeader(String key) {
headers.remove(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.synapse.mediators.elementary.Source;
import org.apache.synapse.mediators.elementary.Target;
import org.apache.synapse.message.senders.blocking.BlockingMsgSender;
import org.apache.synapse.util.CallMediatorEnrichUtil;
import org.apache.synapse.util.MediatorEnrichUtil;
import org.apache.synapse.util.MessageHelper;

import java.io.IOException;
Expand Down Expand Up @@ -158,33 +158,33 @@ public boolean mediate(MessageContext synInCtx) {
sourceMessageType = originalMessageType;
}
if (isSourceAvailable) {
CallMediatorEnrichUtil.buildMessage(synInCtx);
MediatorEnrichUtil.buildMessage(synInCtx);
}
if (isSourceAvailable && isTargetAvailable) {
Source sourceForInboundPayload = CallMediatorEnrichUtil.createSourceWithBody();
Source sourceForInboundPayload = MediatorEnrichUtil.createSourceWithBody();
Target targetForOriginalPayload =
CallMediatorEnrichUtil.createTargetWithProperty(INTERMEDIATE_ORIGINAL_BODY);
Target targetForOutboundPayload = CallMediatorEnrichUtil.createTargetWithBody();
MediatorEnrichUtil.createTargetWithProperty(INTERMEDIATE_ORIGINAL_BODY);
Target targetForOutboundPayload = MediatorEnrichUtil.createTargetWithBody();
sourceForInboundPayload.setClone(true);
CallMediatorEnrichUtil
MediatorEnrichUtil
.doEnrich(synInCtx, sourceForInboundPayload, targetForOriginalPayload, originalMessageType);
if (!(EnrichMediator.BODY == sourceForOutboundPayload.getSourceType() &&
EnrichMediator.BODY == targetForOutboundPayload.getTargetType())) {
CallMediatorEnrichUtil
MediatorEnrichUtil
.doEnrich(synInCtx, sourceForOutboundPayload, targetForOutboundPayload, getSourceMessageType());
}
if (!sourceMessageType.equalsIgnoreCase(originalMessageType)) {
CallMediatorEnrichUtil.setContentType(synInCtx, sourceMessageType, sourceMessageType);
MediatorEnrichUtil.setContentType(synInCtx, sourceMessageType, sourceMessageType);
if (originalMessageType.equalsIgnoreCase(JSON_TYPE)) {
JsonUtil.removeJsonStream(axis2MessageContext);
}
}
} else if (isSourceAvailable) {
Target targetForOutboundPayload = CallMediatorEnrichUtil.createTargetWithBody();
CallMediatorEnrichUtil
Target targetForOutboundPayload = MediatorEnrichUtil.createTargetWithBody();
MediatorEnrichUtil
.doEnrich(synInCtx, sourceForOutboundPayload, targetForOutboundPayload, getSourceMessageType());
if (!sourceMessageType.equalsIgnoreCase(originalMessageType)) {
CallMediatorEnrichUtil.setContentType(synInCtx, sourceMessageType, sourceMessageType);
MediatorEnrichUtil.setContentType(synInCtx, sourceMessageType, sourceMessageType);
if (originalMessageType.equalsIgnoreCase(JSON_TYPE)) {
JsonUtil.removeJsonStream(axis2MessageContext);
}
Expand Down Expand Up @@ -280,27 +280,27 @@ private boolean handleBlockingCall(MessageContext synInCtx, String originalMessa
public void postMediate(MessageContext response, String originalMessageType, String originalContentType,
Map originalTransportHeaders) {
if (isTargetAvailable()) {
CallMediatorEnrichUtil.buildMessage(response);
MediatorEnrichUtil.buildMessage(response);
}
if (isTargetAvailable && isSourceAvailable) {
Source sourceForResponsePayload = CallMediatorEnrichUtil.createSourceWithBody();
Source sourceForResponsePayload = MediatorEnrichUtil.createSourceWithBody();
Source sourceForOriginalPayload =
CallMediatorEnrichUtil.createSourceWithProperty(INTERMEDIATE_ORIGINAL_BODY);
Target targetForResponsePayload = CallMediatorEnrichUtil.createTargetWithBody();
CallMediatorEnrichUtil.doEnrich(response, sourceForResponsePayload, targetForInboundPayload,
MediatorEnrichUtil.createSourceWithProperty(INTERMEDIATE_ORIGINAL_BODY);
Target targetForResponsePayload = MediatorEnrichUtil.createTargetWithBody();
MediatorEnrichUtil.doEnrich(response, sourceForResponsePayload, targetForInboundPayload,
getSourceMessageType());
CallMediatorEnrichUtil.doEnrich(response, sourceForOriginalPayload, targetForResponsePayload,
MediatorEnrichUtil.doEnrich(response, sourceForOriginalPayload, targetForResponsePayload,
originalMessageType);
CallMediatorEnrichUtil.preservetransportHeaders(response, originalTransportHeaders);
MediatorEnrichUtil.preservetransportHeaders(response, originalTransportHeaders);
if (!sourceMessageType.equalsIgnoreCase(originalMessageType)) {
CallMediatorEnrichUtil.setContentType(response, originalMessageType, originalContentType);
MediatorEnrichUtil.setContentType(response, originalMessageType, originalContentType);
if (sourceMessageType.equalsIgnoreCase(JSON_TYPE)) {
JsonUtil.removeJsonStream(((Axis2MessageContext) response).getAxis2MessageContext());
}
}
} else if (isTargetAvailable) {
Source sourceForResponsePayload = CallMediatorEnrichUtil.createSourceWithBody();
CallMediatorEnrichUtil.doEnrich(response, sourceForResponsePayload, targetForInboundPayload,
Source sourceForResponsePayload = MediatorEnrichUtil.createSourceWithBody();
MediatorEnrichUtil.doEnrich(response, sourceForResponsePayload, targetForInboundPayload,
getSourceMessageType());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.SynapseLog;
import org.apache.synapse.commons.json.Constants;
Expand All @@ -52,7 +51,7 @@
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.mediators.Value;
import org.apache.synapse.mediators.eip.EIPUtils;
import org.apache.synapse.util.CallMediatorEnrichUtil;
import org.apache.synapse.util.MediatorEnrichUtil;
import org.apache.synapse.util.InlineExpressionUtil;
import org.apache.synapse.util.synapse.expression.constants.ExpressionConstants;
import org.apache.synapse.util.xpath.SynapseJsonPath;
Expand Down Expand Up @@ -253,7 +252,7 @@ public void insert(MessageContext synContext,
.getProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS);
JsonObject headers = EIPUtils.convertMapToJsonObj(transportHeaders);
result.put(ExpressionConstants.HEADERS, headers);
result.put(ExpressionConstants.ATTRIBUTES, CallMediatorEnrichUtil.populateTransportAttributes(synContext));
result.put(ExpressionConstants.ATTRIBUTES, MediatorEnrichUtil.populateTransportAttributes(synContext));
synContext.setVariable(key, result);
} else {
synLog.error("Action " + action + " is not supported when enriching variables");
Expand Down Expand Up @@ -508,7 +507,7 @@ public void insertJson(MessageContext synCtx, Object sourceJsonElement, SynapseL
.getProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS);
JsonObject headers = EIPUtils.convertMapToJsonObj(transportHeaders);
result.put(ExpressionConstants.HEADERS, headers);
result.put(ExpressionConstants.ATTRIBUTES, CallMediatorEnrichUtil.populateTransportAttributes(synCtx));
result.put(ExpressionConstants.ATTRIBUTES, MediatorEnrichUtil.populateTransportAttributes(synCtx));
synCtx.setVariable(key, result);
}
break;
Expand Down
Loading

0 comments on commit c63a88b

Please sign in to comment.