Skip to content

Latest commit

 

History

History
389 lines (264 loc) · 12.5 KB

kafka-clients-NetworkClient.adoc

File metadata and controls

389 lines (264 loc) · 12.5 KB

NetworkClient — Non-Blocking Network KafkaClient

NetworkClient is a non-blocking KafkaClient that uses a Selectable for network communication (i.e. sending and receiving messages).

Note
Selector is the one and only Selectable that uses Java’s selectable channels for stream-oriented connecting sockets (i.e. Java’s java.nio.channels.SocketChannel).

NetworkClient does the actual reads and writes (to sockets) every poll.

NetworkClient
Figure 1. NetworkClient

NetworkClient uses the MetadataUpdater for the following:

NetworkClient is created when:

  • KafkaConsumer is created (with ConsumerNetworkClient)

  • KafkaProducer is created (with Sender)

  • KafkaAdminClient is created (using createInternal)

  • AdminClient is created (with ConsumerNetworkClient)

  • ControllerChannelManager is requested to addNewBroker (and creates a RequestSendThread daemon thread and a ControllerBrokerStateInfo)

  • TransactionMarkerChannelManager is created

  • KafkaServer does doControlledShutdown

  • ReplicaFetcherBlockingSend is created

Table 1. NetworkClient’s Internal Properties (e.g. Registries and Counters)
Name Description

connectionStates

ClusterConnectionStates

Used when…​FIXME

Tip

Enable DEBUG logging level for org.apache.kafka.clients.NetworkClient logger to see what happens inside.

Add the following line to config/tools-log4j.properties (for Kafka tools):

log4j.logger.org.apache.kafka.clients.NetworkClient=DEBUG

Add the following line to config/log4j.properties:

log4j.logger.org.apache.kafka.clients.NetworkClient=DEBUG

Refer to Logging.

Establishing Connection to Broker Node — initiateConnect Internal Method

void initiateConnect(Node node, long now)

initiateConnect prints out the following DEBUG message to the logs:

Initiating connection to node [node]

initiateConnect requests the ClusterConnectionStates to enter the connecting state for the connection to the broker node.

initiateConnect requests the Selectable to connect to the broker node (at a given host and port).

Note
initiateConnect passes the sizes of send and receive buffers for the socket connection.

In case of an IO failure, initiateConnect requests the ClusterConnectionStates to enter the disconnected state for the connection to the broker node.

initiateConnect requests the MetadataUpdater for update.

You should see the following DEBUG message in the logs:

Error connecting to node [node]
Note

initiateConnect is used when:

ready Method

boolean ready(Node node, long now)
Note
ready is a part of KafkaClient Contract.

ready…​FIXME

wakeup Method

void wakeup()
Note
wakeup is a part of KafkaClient Contract.

wakeup simply requests the internal Selectable to wakeup

Note
wakeup is used when…​FIXME

Reading and Writing to Socket — poll Method

List<ClientResponse> poll(
  long timeout,
  long now)
Note
poll is a part of KafkaClient contract.

poll then requests Selectable to poll.

In the end, poll handles completed request sends, receives, disconnected connections, records any connections to new brokers, initiates API version requests, expire in-flight requests, and finally triggers their RequestCompletionHandlers.

In case abortedSends is not empty, poll creates a collection of ClientResponse with abortedSends, triggers their RequestCompletionHandlers and returns them.

handleCompletedReceives Internal Method

void handleCompletedReceives(
  List<ClientResponse> responses,
  long now)

handleCompletedReceives…​FIXME

Note
handleCompletedReceives is used exclusively when NetworkClient is requested to poll.

Creating NetworkClient Instance

NetworkClient takes the following when created:

NetworkClient initializes the internal registries and counters.

Informing ClientResponse about Response Being Completed — completeResponses Internal Method

void completeResponses(List<ClientResponse> responses)

completeResponses informs every ClientResponse (in the input responses) that a response has been completed.

In case of any exception, completeResponses prints out the following ERROR message to the logs:

Uncaught error in request completion: [exception]
Note
completeResponses is used when NetworkClient is requested to poll (for both abortedSends and completed actions).

Creating ClientRequest — newClientRequest Method

ClientRequest newClientRequest(
  String nodeId,
  AbstractRequest.Builder<?> requestBuilder,
  long createdTimeMs,
  boolean expectResponse,
  int requestTimeoutMs,
  RequestCompletionHandler callback)
Note
newClientRequest is part of the KafkaClient Contract to…​FIXME.

newClientRequest simply creates a new ClientRequest (with the input parameters and the correlation incremented, the clientId and the defaultRequestTimeoutMs).

sendInternalMetadataRequest Internal Method

void sendInternalMetadataRequest(
  MetadataRequest.Builder builder,
  String nodeConnectionId,
  long now)

sendInternalMetadataRequest…​FIXME

Note
sendInternalMetadataRequest is used exclusively when DefaultMetadataUpdater is requested to maybeUpdate.

doSend Internal Method

void doSend(
  ClientRequest clientRequest,
  boolean isInternalRequest,
  long now)
void doSend(
  ClientRequest clientRequest,
  boolean isInternalRequest,
  long now,
  AbstractRequest request)

doSend…​FIXME

Note
doSend is used when NetworkClient is requested to send, sendInternalMetadataRequest and handleInitiateApiVersionRequests.

send Method

void send(ClientRequest request, long now)
Note
send is part of the KafkaClient Contract to…​FIXME.

send…​FIXME

handleDisconnections Internal Method

void handleDisconnections(List<ClientResponse> responses, long now)

handleDisconnections…​FIXME

Note
handleDisconnections is used exclusively when NetworkClient is requested to poll.

handleTimedOutRequests Internal Method

void handleTimedOutRequests(List<ClientResponse> responses, long now)

handleTimedOutRequests…​FIXME

Note
handleTimedOutRequests is used exclusively when NetworkClient is requested to poll.

processDisconnection Internal Method

void processDisconnection(
  List<ClientResponse> responses,
  String nodeId,
  long now,
  ChannelState disconnectState)

processDisconnection…​FIXME

Note
processDisconnection is used when NetworkClient is requested to handleTimedOutRequests, handleApiVersionsResponse, and handleDisconnections.

handleApiVersionsResponse Internal Method

void handleApiVersionsResponse(
  List<ClientResponse> responses,
  InFlightRequest req,
  long now,
  ApiVersionsResponse apiVersionsResponse)

handleApiVersionsResponse…​FIXME

Note
handleApiVersionsResponse is used exclusively when NetworkClient is requested to handleCompletedReceives (when requested to poll).

leastLoadedNode Method

Node leastLoadedNode(long now)
Note
leastLoadedNode is part of the KafkaClient Contract to…​FIXME.

leastLoadedNode…​FIXME

close Method

void close()
Note
close is part of Java’s java.io.Closeable to close this stream and releases any system resources associated with it.

close…​FIXME

close Method

void close(String nodeId)
Note
close is part of the KafkaClient Contract to…​FIXME.

close…​FIXME

isReady Method

boolean isReady(Node node, long now)
Note
isReady is part of the KafkaClient Contract to…​FIXME.

isReady…​FIXME

disconnect Method

void disconnect(String nodeId)
Note
disconnect is part of the KafkaClient Contract to…​FIXME.

disconnect…​FIXME