Skip to content

Commit

Permalink
properties for connectors (#3)
Browse files Browse the repository at this point in the history
* fix properties for connectors
* update connect
* cleanup
  • Loading branch information
laurentleseigneur authored Jul 9, 2024
1 parent b000b9c commit 6f857b4
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,36 +68,38 @@ protected void executeBusinessLogic() throws ConnectorException {
String server = getInputParameter(KAFKA_SERVERS).toString();
String groupId = getInputParameter(KAFKA_GROUP_ID).toString();
String user = getInputParameter(KAFKA_USER).toString();
String password = getInputParameter(KAFKA_PASSWORD).toString();
String topic = getInputParameter(KAFKA_TOPIC).toString();
Integer timeout = (Integer) getInputParameter(KAFKA_TIMEOUT);

this.consumer = new EventConsumer();

LOGGER.info(String.format("KAFKA_SERVER: %s", server));
LOGGER.info(String.format("KAFKA_GROUP_ID: %s", groupId));
LOGGER.info(String.format("KAFKA_USER: %s", user));
LOGGER.info(String.format("KAFKA_PASSWORD: %s", "***"));
LOGGER.info(String.format("KAFKA_TOPIC: %s", topic));
LOGGER.info(String.format("KAFKA_TIMEOUT: %s", timeout));
LOGGER.info("Getting messages...");
connect();
setOutputParameter(KAFKA_RESPONSE, consumer.get(topic, timeout));
LOGGER.info("Got messages...");
}

/**
* [Optional] Open a connection to remote server
*/
@Override
public void connect() throws ConnectorException {
LOGGER.info("Creating consumer...");
LOGGER.info("Connecting consumer...");
consumer = new EventConsumer();
consumer.createConsumer((String) getInputParameter(KAFKA_SERVERS), (String) getInputParameter(KAFKA_USER),
(String) getInputParameter(KAFKA_PASSWORD), getInputParameter(KAFKA_GROUP_ID).toString());
LOGGER.info("Connected consumer...");
}

/**
* [Optional] Close connection to remote server
*/
@Override
public void disconnect() throws ConnectorException {
LOGGER.info("Disconnecting consumer...");
LOGGER.info("Disconnected consumer...");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,9 @@
import org.apache.kafka.common.serialization.StringDeserializer;

public class EventConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(EventConsumer.class);

private KafkaConsumer<String, String> consumer;

public EventConsumer() {

}

public void createConsumer(String kafkaServer, String kafkaUser, String kafkaPassword, String kafkaGroupId) {
// create consumer configs
Properties properties = new Properties();
Expand All @@ -38,23 +32,24 @@ public void createConsumer(String kafkaServer, String kafkaUser, String kafkaPas
}

public ConsumerRecords<String, String> get(String topic, int timeout) {
ConsumerRecords<String, String> records = null;
ConsumerRecords<String, String> records=null;
try {
this.consumer.subscribe(Arrays.asList(topic));
records = this.consumer.poll(Duration.ofMillis(timeout));
consumer.subscribe(Arrays.asList(topic));
records = consumer.poll(Duration.ofMillis(timeout));
for (ConsumerRecord<String, String> record : records) {
LOGGER.info("Key: {}, Value: {}", record.key(), record.value());
LOGGER.info("Partition: {}, Offset: {}", record.partition(), record.offset());
}
this.consumer.close();
consumer.close();
return records;
} catch (WakeupException e) {
LOGGER.info("Wake up exception!");
// we ignore this as this is an expected exception when closing a consumer
} catch (Exception e) {
LOGGER.error(e.getMessage());
throw (e);
}
return records;
return records;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ kafkaServersWidget.label=Servers
kafkaServersWidget.description=Servers url.

kafkaGroupIdWidget.label=GroupId
kafkaGroupIdWidget.description=Group Id.
kafkaGroupIdWidget.description=GroupId.

kafkaUserWidget.label=Username
kafkaUserWidget.description=Servers authentication username.
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources-filtered/connector-kafka-producer.def
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<input mandatory="false" name="kafkaUser" type="java.lang.String"/>
<input mandatory="false" name="kafkaPassword" type="java.lang.String"/>
<input mandatory="true" name="kafkaTopic" type="java.lang.String"/>
<input mandatory="true" name="kafkaId" type="java.lang.Long"/>
<input mandatory="true" name="kafkaId" type="java.lang.String"/>
<input mandatory="true" name="kafkaMessage" type="java.lang.String"/>
<input mandatory="true" name="kafkaTimeout" type="java.lang.Integer"/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ void should_create_output_for_valid_input() throws ConnectorException, Execution
//given
createKafkaMessage(kafka, "key", "value");

//when
Map<String, Object> parameters = new HashMap<>();
String server = String.format("%s:%s", SERVER, kafka.getMappedPort(9093));
parameters.put(KafkaConsumer.KAFKA_SERVERS, server);
Expand All @@ -55,8 +56,12 @@ void should_create_output_for_valid_input() throws ConnectorException, Execution
parameters.put(KafkaConsumer.KAFKA_PASSWORD, PASSWORD);
parameters.put(KafkaConsumer.KAFKA_TOPIC, TOPIC);
parameters.put(KafkaConsumer.KAFKA_TIMEOUT, TIMEOUT);

connector.setInputParameters(parameters);
connector.connect();
Map<String, Object> outputs = connector.execute();

//then
assertThat(outputs).containsKey(KAFKA_RESPONSE);
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) outputs.get(KAFKA_RESPONSE);
assertThat(records.count()).isEqualTo(1);
Expand Down

0 comments on commit 6f857b4

Please sign in to comment.