diff --git a/dapeng-service-invoke/pom.xml b/dapeng-service-invoke/pom.xml new file mode 100644 index 00000000..824ea4f7 --- /dev/null +++ b/dapeng-service-invoke/pom.xml @@ -0,0 +1,59 @@ + + + + dapeng-parent + com.isuwang + 1.2.2 + + 4.0.0 + + dapeng-service-invoke + + + + + + + + com.isuwang + dapeng-remoting-netty + 1.2.2 + + + + com.isuwang + dapeng-registry-zookeeper + 1.2.2 + + + + + + com.isuwang + dapeng-core + + + + com.google.code.gson + gson + 2.3.1 + + + + org.slf4j + slf4j-api + 1.7.12 + + + + com.google.guava + guava + 16.0.1 + + + + + + \ No newline at end of file diff --git a/dapeng-service-invoke/src/main/java/com/isuwang/service/invoke/ApiServices.java b/dapeng-service-invoke/src/main/java/com/isuwang/service/invoke/ApiServices.java new file mode 100644 index 00000000..8ea7e682 --- /dev/null +++ b/dapeng-service-invoke/src/main/java/com/isuwang/service/invoke/ApiServices.java @@ -0,0 +1,143 @@ +package com.isuwang.service.invoke; + +import com.google.common.collect.TreeMultimap; +import com.isuwang.dapeng.core.SoaException; +import com.isuwang.dapeng.core.metadata.*; +import com.isuwang.dapeng.registry.ServiceInfo; +import com.isuwang.dapeng.remoting.fake.metadata.MetadataClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.xml.bind.JAXB; +import java.io.StringReader; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +/** + * Created by Tony_PC on 2016/6/20. + */ +public class ApiServices { + + private static final Logger LOGGER = LoggerFactory.getLogger(ApiServices.class); + + private static Map services = new TreeMap<>(); + + private static Map fullNameService = new TreeMap<>(); + + public static TreeMultimap urlMappings = TreeMultimap.create(); + + private static ZookeeperWatcher zookeeperWatcher; + + public static void init() { + reloadServices(); + } + + public static void reloadServices() { + + final Map services = new TreeMap<>(); + urlMappings.clear(); + + Map> servicesInfo = zookeeperWatcher.getAvailableServices(); + + Set serviceKeys = servicesInfo.keySet(); + + for (String key : serviceKeys) { + String serviceName = key; + List serviceInfoList = servicesInfo.get(key); + for (ServiceInfo serviceInfo : serviceInfoList) { + String version = serviceInfo.getVersionName(); + String metadata = ""; + try { + metadata = new MetadataClient(serviceName, version).getServiceMetadata(); + if (metadata != null) { + try (StringReader reader = new StringReader(metadata)) { + Service serviceData = JAXB.unmarshal(reader, Service.class); + String serviceKey = getKey(serviceData); + if (!services.containsKey(serviceKey)) { + services.put(serviceKey, serviceData); + String fullNameKey = getFullNameKey(serviceData); + fullNameService.put(fullNameKey, serviceData); + loadResource(serviceData, services); + } + } catch (Exception e) { + LOGGER.error("JAXB 解析Service 出错"); + } + } + } catch (SoaException e) { + LOGGER.error("生成SERVICE出错", e); + } + } + } + ApiServices.services = services; + LOGGER.info("size of urlMapping: " + urlMappings.size()); + } + + public void destory() { + services.clear(); + } + + public static void loadResource(Service service, Map services) { + + //将service和service中的方法、结构体、枚举和字段名分别设置对应的url,以方便搜索 + urlMappings.put(service.getName(), "api/service/" + service.name + "/" + service.meta.version + ".htm"); + List methods = service.getMethods(); + for (int i = 0; i < methods.size(); i++) { + Method method = methods.get(i); + urlMappings.put(method.name, "api/method/" + service.name + "/" + service.meta.version + "/" + method.name + ".htm"); + } + + List structs = service.getStructDefinitions(); + for (int i = 0; i < structs.size(); i++) { + Struct struct = structs.get(i); + urlMappings.put(struct.name, "api/struct/" + service.name + "/" + service.meta.version + "/" + struct.namespace + "." + struct.name + ".htm"); + + List fields = struct.getFields(); + for (int j = 0; j < fields.size(); j++) { + Field field = fields.get(j); + urlMappings.put(field.name, "api/struct/" + service.name + "/" + service.meta.version + "/" + struct.namespace + "." + struct.name + ".htm"); + } + } + + List tEnums = service.getEnumDefinitions(); + for (int i = 0; i < tEnums.size(); i++) { + TEnum tEnum = tEnums.get(i); + urlMappings.put(tEnum.name, "api/enum/" + service.name + "/" + service.meta.version + "/" + tEnum.namespace + "." + tEnum.name + ".htm"); + } + + } + + public static Service getService(String name, String version) { + + if (name.contains(".")) + return fullNameService.get(getKey(name, version)); + else + return services.get(getKey(name, version)); + } + + private static String getKey(Service service) { + return getKey(service.getName(), service.getMeta().version); + } + + private static String getFullNameKey(Service service) { + return getKey(service.getNamespace() + "." + service.getName(), service.getMeta().version); + } + + private static String getKey(String name, String version) { + return name + ":" + version; + } + + public Map getServices() { + return services; + } + + public ZookeeperWatcher getZookeeperWatcher() { + return zookeeperWatcher; + } + + public void setZookeeperWatcher(ZookeeperWatcher zookeeperWatcher) { + this.zookeeperWatcher = zookeeperWatcher; + } + +} diff --git a/dapeng-service-invoke/src/main/java/com/isuwang/service/invoke/BaseController.java b/dapeng-service-invoke/src/main/java/com/isuwang/service/invoke/BaseController.java new file mode 100644 index 00000000..f62298ab --- /dev/null +++ b/dapeng-service-invoke/src/main/java/com/isuwang/service/invoke/BaseController.java @@ -0,0 +1,77 @@ +package com.isuwang.service.invoke; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.isuwang.dapeng.core.SoaHeader; +import com.isuwang.dapeng.core.SoaSystemEnvProperties; +import com.isuwang.service.invoke.entity.BaseRequest; +import com.isuwang.dapeng.registry.RegistryAgent; +import com.isuwang.dapeng.registry.RegistryAgentProxy; +import com.isuwang.dapeng.remoting.BaseClient; +import com.isuwang.dapeng.remoting.fake.json.JSONPost; +import com.isuwang.dapeng.remoting.filter.LoadBalanceFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ServiceLoader; + +/** + * Created by lihuimin on 2017/12/6. + */ +public class BaseController { + + private static final Logger LOGGER = LoggerFactory.getLogger(BaseController.class); + + private static JSONPost jsonPost; + + static{ + if (!SoaSystemEnvProperties.SOA_REMOTING_MODE.equals("local")) { + try { + + ServiceLoader registryAgentLoader = ServiceLoader.load(RegistryAgent.class,BaseClient.class.getClassLoader()); + for (RegistryAgent registryAgent : registryAgentLoader) { + RegistryAgentProxy.setCurrentInstance(RegistryAgentProxy.Type.Client,registryAgent); + RegistryAgentProxy.getCurrentInstance(RegistryAgentProxy.Type.Client).start(); + ApiServices.init(); + new ZookeeperWatcher(true).init(); + } + } catch (Exception e) { + LOGGER.error("Load registry error", e); + } + } else { + LOGGER.info("soa remoting mode is {},client not load registry", SoaSystemEnvProperties.SOA_REMOTING_MODE); + } + + } + + + public static String invoke(BaseRequest baseRequest){ + JsonObject jsonObjectParameter = new JsonParser().parse(baseRequest.getJsonParameter()).getAsJsonObject(); + com.isuwang.dapeng.core.metadata.Service service = ApiServices.getService(baseRequest.getServiceName(), baseRequest.getVersionName()); + + String callerInfo = LoadBalanceFilter.getCallerInfo(baseRequest.getServiceName() , baseRequest.getVersionName(), baseRequest.getVersionName()); + + SoaHeader header = new SoaHeader(); + header.setServiceName(baseRequest.getServiceName()); + header.setVersionName(baseRequest.getVersionName()); + header.setMethodName(baseRequest.getMethodName()); + + String parameter = jsonObjectParameter.toString(); + + if (callerInfo == null && SoaSystemEnvProperties.SOA_REMOTING_MODE.equals("local")) { + jsonPost = new JSONPost(SoaSystemEnvProperties.SOA_SERVICE_IP, SoaSystemEnvProperties.SOA_SERVICE_PORT, true); + } else if (callerInfo != null) { + String[] infos = callerInfo.split(":"); + jsonPost = new JSONPost(infos[0], Integer.valueOf(infos[1]), true); + } else { + return String.format("{\"responseCode\":\"%s\", \"responseMsg\":\"%s\", \"success\":\"%s\"}", "Err-Core-098", "无可用的服务实例", "{}"); + } + try { + return jsonPost.callServiceMethod(header, parameter, service); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + return null; + } + +} diff --git a/dapeng-service-invoke/src/main/java/com/isuwang/service/invoke/ZookeeperWatcher.java b/dapeng-service-invoke/src/main/java/com/isuwang/service/invoke/ZookeeperWatcher.java new file mode 100644 index 00000000..7569b27c --- /dev/null +++ b/dapeng-service-invoke/src/main/java/com/isuwang/service/invoke/ZookeeperWatcher.java @@ -0,0 +1,431 @@ +package com.isuwang.service.invoke; + +import com.isuwang.dapeng.core.SoaSystemEnvProperties; +import com.isuwang.dapeng.core.version.Version; +import com.isuwang.dapeng.registry.ConfigKey; +import com.isuwang.dapeng.registry.ServiceInfo; +import com.isuwang.dapeng.registry.ServiceInfos; +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Created by tangliu on 2016/2/29. + */ +public class ZookeeperWatcher { + + + private static final Logger LOGGER = LoggerFactory.getLogger(com.isuwang.dapeng.registry.zookeeper.ZookeeperWatcher.class); + + private final boolean isClient; + + private final static Map> caches = new ConcurrentHashMap<>(); + private final static Map> config = new ConcurrentHashMap<>(); + + private ZooKeeper zk; + private CountDownLatch connectDownLatch; + + public ZookeeperWatcher(boolean isClient) { + this.isClient = isClient; + } + + public void init() { + connect(); + + if (isClient) { + getServersList(); + } + + getConfig("/soa/config"); + + try { + connectDownLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } + } + + + private void tryCreateNode(String path) { + + String[] paths = path.split("/"); + + String createPath = "/"; + for (int i = 1; i < paths.length; i++) { + createPath += paths[i]; + addPersistServerNode(createPath, path); + createPath += "/"; + } + } + + /** + * 添加持久化的节点 + * + * @param path + * @param data + */ + private void addPersistServerNode(String path, String data) { + Stat stat = exists(path); + + if (stat == null) + zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, nodeCreatedCallBack, data); + } + + /** + * 判断节点是否存在 + * + * @param path + * @return + */ + private Stat exists(String path) { + Stat stat = null; + try { + stat = zk.exists(path, false); + } catch (KeeperException e) { + } catch (InterruptedException e) { + } + return stat; + } + + /** + * 异步添加serverName节点的回调处理 + */ + private AsyncCallback.StringCallback nodeCreatedCallBack = (rc, path, ctx, name) -> { + switch (KeeperException.Code.get(rc)) { + case CONNECTIONLOSS: + LOGGER.info("创建节点:{},连接断开,重新创建", path); + tryCreateNode((String) ctx); //每次创建都会从根节点开始尝试创建,避免根节点未创建而造成创建失败 +// addPersistServerNode(path, (String) ctx); + break; + case OK: + LOGGER.info("创建节点:{},成功", path); + break; + case NODEEXISTS: + LOGGER.info("创建节点:{},已存在", path); + break; + default: + LOGGER.info("创建节点:{},失败", path); + } + }; + + + public void destroy() { + if (zk != null) { + try { + zk.close(); + zk = null; + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + caches.clear(); + config.clear(); + + LOGGER.info("关闭连接,清空service info caches"); + } + + public ServiceInfos getServiceInfo(String serviceName, String versionName, boolean compatible) { + + List serverList = caches.get(serviceName); + + List usableList = new ArrayList<>(); + + if (serverList != null && serverList.size() > 0) { + if (!compatible) { + usableList.addAll(serverList.stream().filter(server -> server.getVersionName().equals(versionName)).collect(Collectors.toList())); + } else { + usableList.addAll(serverList.stream().filter(server -> Version.toVersion(server.getVersionName()).compatibleTo(Version.toVersion(versionName))).collect(Collectors.toList())); + } + } + ServiceInfos serviceInfos = new ServiceInfos(true, usableList); + + return serviceInfos; + } + + //----------------------servicesList相关----------------------------------- + + /** + * 获取zookeeper中的services节点的子节点,并设置监听器 + * + * @return + */ + public void getServersList() { + + tryCreateNode("/soa/runtime/services"); + + zk.getChildren("/soa/runtime/services", watchedEvent -> { + //Children发生变化,则重新获取最新的services列表 + if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) { + LOGGER.info("{}子节点发生变化,重新获取子节点...", watchedEvent.getPath()); + + getServersList(); + } + }, (rc, path, ctx, children) -> { + switch (KeeperException.Code.get(rc)) { + case CONNECTIONLOSS: + getServersList(); + + break; + case OK: + LOGGER.info("获取services列表成功"); + + resetServiceCaches(path, children); + break; + default: + LOGGER.error("get services list fail"); + } + }, null); + } + + //----------------------servicesList相关----------------------------------- + + + //----------------------serviceInfo相关----------------------------------- + + /** + * 对每一个serviceName,要获取serviceName下的子节点 + * + * @param path + * @param serviceList + */ + private void resetServiceCaches(String path, List serviceList) { + for (String serviceName : serviceList) { + getServiceInfoByPath(path + "/" + serviceName, serviceName); + } + } + + /** + * 根据serviceName节点的路径,获取下面的子节点,并监听子节点变化 + * + * @param servicePath + */ + private void getServiceInfoByPath(String servicePath, String serviceName) { + zk.getChildren(servicePath, watchedEvent -> { + if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) { + LOGGER.info("{}子节点发生变化,重新获取信息", watchedEvent.getPath()); + + String[] paths = watchedEvent.getPath().split("/"); + getServiceInfoByPath(watchedEvent.getPath(), paths[paths.length - 1]); + } + }, (rc, path, ctx, children) -> { + switch (KeeperException.Code.get(rc)) { + case CONNECTIONLOSS: + getServiceInfoByPath(path, (String) ctx); + break; + case OK: + LOGGER.info("获取{}的子节点成功", path); + + resetServiceInfoByName((String) ctx, path, children); + break; + default: + LOGGER.error("获取{}的子节点失败", path); + } + }, serviceName); + } + + /** + * serviceName下子节点列表即可用服务地址列表 + * 子节点命名为:host:port:versionName + * + * @param serviceName + * @param path + * @param infos + */ + private void resetServiceInfoByName(String serviceName, String path, List infos) { + LOGGER.info(serviceName + "\n" + infos); + + List sinfos = new ArrayList<>(); + + for (String info : infos) { + String[] serviceInfo = info.split(":"); + ServiceInfo sinfo = new ServiceInfo(serviceInfo[0], Integer.valueOf(serviceInfo[1]), serviceInfo[2]); + sinfos.add(sinfo); + } + + if (caches.containsKey(serviceName)) { + List currentInfos = caches.get(serviceName); + + for (ServiceInfo sinfo : sinfos) { + for (ServiceInfo currentSinfo : currentInfos) { + if (sinfo.equalTo(currentSinfo)) { + sinfo.setActiveCount(currentSinfo.getActiveCount()); + break; + } + } + } + } + caches.put(serviceName, sinfos); + new ApiServices().reloadServices(); + } + + /** + * 获取可用服务 IP + Port + Version + * + * @param + */ + public static Map getAvailableServices() { + Map> availableServices = new ConcurrentHashMap<>(); + Set servicesKey = caches.keySet(); + for (String key : servicesKey) { + List serviceInfos = caches.get(key); + if (serviceInfos.size() > 0) { + availableServices.put(key, serviceInfos); + } + } + return availableServices; + } + + + //----------------------servicesInfo相关----------------------------------- + + //----------------------static config------------------------------------- + private void getConfig(String path) { + + + //每次getConfig之前,先判断父节点是否存在,若不存在,则创建 + tryCreateNode("/soa/config"); + + zk.getChildren(path, watchedEvent -> { + if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) { + LOGGER.info(watchedEvent.getPath() + "'s children changed, reset config in memory"); + + getConfig(watchedEvent.getPath()); + } + }, (rc, path1, ctx, children) -> { + switch (KeeperException.Code.get(rc)) { + case CONNECTIONLOSS: + LOGGER.info("connect loss, reset {} config in memory", path1); + + getConfig(path1); + break; + case OK: + LOGGER.info("get children of {} succeed.", path1); + + resetConfigCache(path1, children); + + break; + default: + LOGGER.error("get chileren of {} failed", path1); + } + }, null); + } + + private void resetConfigCache(String path, List children) { + for (String key : children) { + String configNodePath = path + "/" + key; + + getConfigData(configNodePath, key); + } + } + + private void getConfigData(String path, String configNodeName) { + if (configNodeName == null) { + String[] tmp = path.split("/"); + configNodeName = tmp[tmp.length - 1]; + } + + zk.getData(path, watchedEvent -> { + if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) { + LOGGER.info(watchedEvent.getPath() + "'s data changed, reset config in memory"); + getConfigData(watchedEvent.getPath(), null); + } + }, (rc, path1, ctx, data, stat) -> { + switch (KeeperException.Code.get(rc)) { + case CONNECTIONLOSS: + getConfigData(path1, (String) ctx); + break; + case OK: + processConfigData((String) ctx, data); + break; + default: + LOGGER.error("Error when trying to get data of {}.", path1); + } + }, configNodeName); + } + + private void processConfigData(String configNode, byte[] data) { + Map propertyMap = new HashMap<>(); + try { + String propertiesStr = new String(data, "utf-8"); + + String[] properties = propertiesStr.split(";"); + for (String property : properties) { + + String[] key_values = property.split("="); + if (key_values.length == 2) { + + ConfigKey type = ConfigKey.findByValue(key_values[0]); + switch (type) { + + case Thread: + Integer value = Integer.valueOf(key_values[1]); + propertyMap.put(type, value); + break; + case ThreadPool: + Boolean bool = Boolean.valueOf(key_values[1]); + propertyMap.put(type, bool); + break; + case Timeout: + Integer timeout = Integer.valueOf(key_values[1]); + propertyMap.put(type, timeout); + break; + case LoadBalance: + propertyMap.put(type, key_values[1]); + break; + case FailOver: + propertyMap.put(type, Integer.valueOf(key_values[1])); + break; + case Compatible: + propertyMap.put(type, key_values[1].split(",")); + break; + } + } + } + + LOGGER.info("get config form {} with data [{}]", configNode, propertiesStr); + } catch (UnsupportedEncodingException e) { + LOGGER.error(e.getMessage(), e); + } + + config.put(configNode, propertyMap); + } + + //---------------------static config end----------------------------------- + + /** + * 连接zookeeper + */ + private void connect() { + try { + connectDownLatch = new CountDownLatch(1); + + zk = new ZooKeeper(SoaSystemEnvProperties.SOA_ZOOKEEPER_HOST, 15000, e -> { + if (e.getState() == Watcher.Event.KeeperState.Expired) { + LOGGER.info("{} 到zookeeper Server的session过期,重连", isClient ? "Client's" : "Server's"); + + destroy(); + + init(); + } else if (e.getState() == Watcher.Event.KeeperState.SyncConnected) { + LOGGER.info("{} Zookeeper Watcher 已连接 zookeeper Server", isClient ? "Client's" : "Server's"); +// connectDownLatch.countDown(); + } + }); + } catch (Exception e) { + LOGGER.info(e.getMessage(), e); + } + } + + public Map> getConfig() { + return config; + } +} diff --git a/dapeng-service-invoke/src/main/java/com/isuwang/service/invoke/entity/BaseRequest.java b/dapeng-service-invoke/src/main/java/com/isuwang/service/invoke/entity/BaseRequest.java new file mode 100644 index 00000000..9f6b3f18 --- /dev/null +++ b/dapeng-service-invoke/src/main/java/com/isuwang/service/invoke/entity/BaseRequest.java @@ -0,0 +1,57 @@ +package com.isuwang.service.invoke.entity; + +/** + * Created by lihuimin on 2017/12/6. + */ +public class BaseRequest { + + private String jsonParameter; + + private String serviceName; + + private String versionName; + + private String methodName; + + public BaseRequest() { + } + + public BaseRequest(String jsonParameter, String serviceName, String versionName, String methodName) { + this.jsonParameter = jsonParameter; + this.serviceName = serviceName; + this.versionName = versionName; + this.methodName = methodName; + } + + public String getJsonParameter() { + return jsonParameter; + } + + public void setJsonParameter(String jsonParameter) { + this.jsonParameter = jsonParameter; + } + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public String getVersionName() { + return versionName; + } + + public void setVersionName(String versionName) { + this.versionName = versionName; + } + + public String getMethodName() { + return methodName; + } + + public void setMethodName(String methodName) { + this.methodName = methodName; + } +} diff --git a/dapeng-service-invoke/src/test/java/com/isuwang/service/invoke/Test.java b/dapeng-service-invoke/src/test/java/com/isuwang/service/invoke/Test.java new file mode 100644 index 00000000..8b42f12c --- /dev/null +++ b/dapeng-service-invoke/src/test/java/com/isuwang/service/invoke/Test.java @@ -0,0 +1,27 @@ +package com.isuwang.service.invoke; + +import com.isuwang.service.invoke.BaseController; +import com.isuwang.service.invoke.entity.BaseRequest; + +import java.io.IOException; + +/** + * Created by lihuimin on 2017/12/6. + */ +public class Test { + + public static void main(String[] args) throws IOException { + System.setProperty("soa.zookeeper.host","192.168.99.100:2181"); + + String jsonParam = "{\"id\": 210}"; + + BaseRequest request = new BaseRequest(jsonParam,"com.isuwang.soa.company.service.CompanyService","1.0.0","findCompanyById"); + + BaseController controller = new BaseController(); + String result = controller.invoke(request); + + System.out.println(result); + + + } +} diff --git a/pom.xml b/pom.xml index 5bffc090..4082d374 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,7 @@ dapeng-route dapeng-tools dapeng-message + dapeng-service-invoke