Skip to content

Commit

Permalink
Merge pull request #24 from RicrodoZ/master
Browse files Browse the repository at this point in the history
[refactor] Added consistent hash load balancing mode.
  • Loading branch information
Snailclimb authored Oct 24, 2020
2 parents 65b3a17 + c3af785 commit 3110191
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 13 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
- [x] Netty 重用 Channel 避免重复连接服务端
- [x] 使用 `CompletableFuture` 包装接受客户端返回结果(之前的实现是通过 `AttributeMap` 绑定到 Channel 上实现的) 详见:[使用 CompletableFuture 优化接受服务提供端返回结果](./docs/使用CompletableFuture优化接受服务提供端返回结果.md)
- [x] **增加 Netty 心跳机制** : 保证客户端和服务端的连接不被断掉,避免重连。
- [x] **客户端调用远程服务的时候进行负载均衡** :调用服务的时候,从很多服务地址中根据相应的负载均衡算法选取一个服务地址。ps:目前只实现了随机负载均衡算法
- [x] **客户端调用远程服务的时候进行负载均衡** :调用服务的时候,从很多服务地址中根据相应的负载均衡算法选取一个服务地址。ps:目前实现了随机负载均衡算法与一致性哈希算法
- [x] **处理一个接口有多个类实现的情况** :对服务分组,发布服务的时候增加一个 group 参数即可。
- [x] **集成 Spring 通过注解注册服务**
- [x] **集成 Spring 通过注解进行服务消费** 。参考: [PR#10](https://github.com/Snailclimb/guide-rpc-framework/pull/10)
Expand Down
9 changes: 9 additions & 0 deletions config/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,15 @@
<module name="TodoComment"/>
<module name="UpperEll"/>

<module name="Indentation">
<property name="basicOffset" value="4"/>
<property name="braceAdjustment" value="0"/>
<property name="caseIndent" value="4"/>
<property name="throwsIndent" value="8"/>
<property name="lineWrappingIndentation" value="8"/>
<property name="arrayInitIndent" value="4"/>
</module>

</module>

</module>
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@
*/
public abstract class AbstractLoadBalance implements LoadBalance {
@Override
public String selectServiceAddress(List<String> serviceAddresses) {
public String selectServiceAddress(List<String> serviceAddresses, String rpcServiceName) {
if (serviceAddresses == null || serviceAddresses.size() == 0) {
return null;
}
if (serviceAddresses.size() == 1) {
return serviceAddresses.get(0);
}
return doSelect(serviceAddresses);
return doSelect(serviceAddresses, rpcServiceName);
}

protected abstract String doSelect(List<String> serviceAddresses);
protected abstract String doSelect(List<String> serviceAddresses, String rpcServiceName);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package github.javaguide.loadbalance;

import github.javaguide.extension.SPI;

import java.util.List;

/**
Expand All @@ -8,12 +10,13 @@
* @author shuang.kou
* @createTime 2020年06月21日 07:44:00
*/
@SPI
public interface LoadBalance {
/**
* Choose one from the list of existing service addresses list
*
* @param serviceAddresses Service address list
* @return target service address
*/
String selectServiceAddress(List<String> serviceAddresses);
String selectServiceAddress(List<String> serviceAddresses, String rpcServiceName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package github.javaguide.loadbalance.loadbalancer;

import github.javaguide.loadbalance.AbstractLoadBalance;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;

/**
* refer to dubbo consistent hash load balance: http://dubbo.apache.org/zh-cn/blog/dubbo-consistent-hash-implementation.html
* @author RicardoZ
* @createTime 2020年10月20日 18:15:20
*/
@Slf4j
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
private final ConcurrentHashMap<String, ConsistentHashSelector> selectors = new ConcurrentHashMap<>();

@Override
protected String doSelect(List<String> serviceAddresses, String rpcServiceName) {
int identityHashCode = System.identityHashCode(serviceAddresses);

ConsistentHashSelector selector = selectors.get(rpcServiceName);

// check for updates
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(rpcServiceName, new ConsistentHashSelector(serviceAddresses, 160, identityHashCode));
selector = selectors.get(rpcServiceName);
}

return selector.select(rpcServiceName);
}

static class ConsistentHashSelector {
private final TreeMap<Long, String> virtualInvokers;

private final int identityHashCode;

ConsistentHashSelector(List<String> invokers, int replicaNumber, int identityHashCode) {
this.virtualInvokers = new TreeMap<>();
this.identityHashCode = identityHashCode;

for (String invoker : invokers) {
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(invoker + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}

static byte[] md5(String key) {
MessageDigest md = null;

try {
md = MessageDigest.getInstance("md5");
byte[] bytes = key.getBytes(StandardCharsets.UTF_8);
md.update(bytes);
} catch (NoSuchAlgorithmException e) {
log.error("An encryption algorithm that does not exist is used: ", e);
e.printStackTrace();
}

return md.digest();
}

static long hash(byte[] digest, int idx) {
return ((long) (digest[3 + idx * 4] & 255) << 24 | (long) (digest[2 + idx * 4] & 255) << 16 | (long) (digest[1 + idx * 4] & 255) << 8 | (long) (digest[idx * 4] & 255)) & 4294967295L;
}

public String select(String rpcServiceName) {
byte[] digest = md5(rpcServiceName);
return selectForKey(hash(digest, 0));
}

public String selectForKey(long hashCode) {
Map.Entry<Long, String> entry = virtualInvokers.tailMap(hashCode, true).firstEntry();

if (entry == null) {
entry = virtualInvokers.firstEntry();
}

return entry.getValue();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
package github.javaguide.loadbalance;
package github.javaguide.loadbalance.loadbalancer;

import github.javaguide.loadbalance.AbstractLoadBalance;

import java.util.List;
import java.util.Random;
Expand All @@ -11,7 +13,7 @@
*/
public class RandomLoadBalance extends AbstractLoadBalance {
@Override
protected String doSelect(List<String> serviceAddresses) {
protected String doSelect(List<String> serviceAddresses, String rpcServiceName) {
Random random = new Random();
return serviceAddresses.get(random.nextInt(serviceAddresses.size()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import github.javaguide.enums.RpcErrorMessageEnum;
import github.javaguide.exception.RpcException;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.loadbalance.LoadBalance;
import github.javaguide.loadbalance.RandomLoadBalance;
import github.javaguide.registry.ServiceDiscovery;
import github.javaguide.registry.zk.util.CuratorUtils;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -23,7 +23,7 @@ public class ZkServiceDiscovery implements ServiceDiscovery {
private final LoadBalance loadBalance;

public ZkServiceDiscovery() {
this.loadBalance = new RandomLoadBalance();
this.loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("loadBalance");
}

@Override
Expand All @@ -34,7 +34,7 @@ public InetSocketAddress lookupService(String rpcServiceName) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName);
}
// load balancing
String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList);
String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcServiceName);
log.info("Successfully found the service address:[{}]", targetServiceUrl);
String[] socketAddressArray = targetServiceUrl.split(":");
String host = socketAddressArray[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public SocketRpcRequestHandlerRunnable(Socket socket) {
public void run() {
log.info("server handle message from client by thread: [{}]", Thread.currentThread().getName());
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Object result = rpcRequestHandler.handle(rpcRequest);
objectOutputStream.writeObject(RpcResponse.success(result, rpcRequest.getRequestId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class KryoSerializer implements Serializer {
@Override
public byte[] serialize(Object obj) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream)) {
Output output = new Output(byteArrayOutputStream)) {
Kryo kryo = kryoThreadLocal.get();
// Object->byte:将对象序列化为byte数组
kryo.writeObject(output, obj);
Expand All @@ -48,7 +48,7 @@ public byte[] serialize(Object obj) {
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream)) {
Input input = new Input(byteArrayInputStream)) {
Kryo kryo = kryoThreadLocal.get();
// byte->Object:从byte数组中反序列化出对对象
Object o = kryo.readObject(input, clazz);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
loadBalance=github.javaguide.loadbalance.loadbalancer.ConsistentHashLoadBalance

0 comments on commit 3110191

Please sign in to comment.