Skip to content

Commit

Permalink
optimize: support instance registration to the registry center
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes committed Jan 2, 2025
1 parent dc08160 commit 1d45946
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.common.metadata.namingserver;
package org.apache.seata.common.metadata;


import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seata.common.metadata.ClusterRole;
import org.apache.seata.common.metadata.Node;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static org.apache.seata.common.util.CollectionUtils.mapToJsonString;


public class Instance {
private String namespace;
private String clusterName;
Expand Down Expand Up @@ -169,25 +164,6 @@ public String toJsonString(ObjectMapper objectMapper) {
}
}


public Map<String, String> toMap() {
Map<String, String> resultMap = new HashMap<>();


resultMap.put("namespace", namespace);
resultMap.put("clusterName", clusterName);
resultMap.put("unit", unit);
resultMap.put("control", control.toString());
resultMap.put("transaction", transaction.toString());
resultMap.put("weight", String.valueOf(weight));
resultMap.put("healthy", String.valueOf(healthy));
resultMap.put("term", String.valueOf(term));
resultMap.put("timestamp", String.valueOf(timestamp));
resultMap.put("metadata", mapToJsonString(metadata));

return resultMap;
}

private static class SingletonHolder {
private static final Instance SERVER_INSTANCE = new Instance();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seata.common.metadata.ClusterRole;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.metadata.Node;
import org.junit.jupiter.api.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.netty.handler.timeout.IdleStateHandler;
import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.XID;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.rpc.RemotingBootstrap;
Expand Down Expand Up @@ -170,9 +171,8 @@ public void initChannel(SocketChannel ch) {
try {
this.serverBootstrap.bind(port).sync();
LOGGER.info("Server started, service listen port: {}", getListenPort());
InetSocketAddress address = new InetSocketAddress(XID.getIpAddress(), XID.getPort());
for (RegistryService<?> registryService : MultiRegistryFactory.getInstances()) {
registryService.register(address);
registryService.register(Instance.getInstance());
}
initialized.set(true);
} catch (SocketException se) {
Expand All @@ -189,9 +189,8 @@ public void shutdown() {
LOGGER.info("Shutting server down, the listen port: {}", XID.getPort());
}
if (initialized.get()) {
InetSocketAddress address = new InetSocketAddress(XID.getIpAddress(), XID.getPort());
for (RegistryService registryService : MultiRegistryFactory.getInstances()) {
registryService.unregister(address);
registryService.unregister(Instance.getInstance());
registryService.close();
}
//wait a few seconds for server transport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.seata.discovery.registry;

import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.config.ConfigurationFactory;

Expand Down Expand Up @@ -61,16 +62,40 @@ public interface RegistryService<T> {
* @param address the address
* @throws Exception the exception
*/
@Deprecated
void register(InetSocketAddress address) throws Exception;

/**
* Register.
*
* @param instance the address
* @throws Exception the exception
*/
default void register(Instance instance) throws Exception{
InetSocketAddress inetSocketAddress = new InetSocketAddress(instance.getTransaction().getHost(), instance.getTransaction().getPort());
register(inetSocketAddress);
}

/**
* Unregister.
*
* @param address the address
* @throws Exception the exception
*/
@Deprecated
void unregister(InetSocketAddress address) throws Exception;

/**
* Unregister.
*
* @param instance the instance
* @throws Exception the exception
*/
default void unregister(Instance instance) throws Exception{
InetSocketAddress inetSocketAddress = new InetSocketAddress(instance.getTransaction().getHost(), instance.getTransaction().getPort());
unregister(inetSocketAddress);
}

/**
* Subscribe.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.metadata.namingserver.MetaResponse;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.CollectionUtils;
Expand Down Expand Up @@ -148,9 +148,11 @@ static NamingserverRegistryServiceImpl getInstance() {

@Override
public void register(InetSocketAddress address) throws Exception {
NetUtil.validAddress(address);
Instance instance = Instance.getInstance();
instance.setTransaction(new Node.Endpoint(address.getAddress().getHostAddress(), address.getPort(), "netty"));
register(Instance.getInstance());
}

@Override
public void register(Instance instance) throws Exception {
instance.setTimestamp(System.currentTimeMillis());
doRegister(instance, getNamingAddrs());
}
Expand Down Expand Up @@ -198,11 +200,15 @@ public boolean doHealthCheck(String url) {
}
}



@Override
public void unregister(InetSocketAddress inetSocketAddress) {
unregister(Instance.getInstance());
}

@Override
public void unregister(InetSocketAddress address) {
NetUtil.validAddress(address);
Instance instance = Instance.getInstance();
instance.setTransaction(new Node.Endpoint(address.getAddress().getHostAddress(), address.getPort(), "netty"));
public void unregister(Instance instance) {
for (String urlSuffix : getNamingAddrs()) {
String url = HTTP_PREFIX + urlSuffix + "/naming/v1/unregister?";
String unit = instance.getUnit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.seata.server.controller;

import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.result.Result;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
*/
package org.apache.seata.server.instance;

import org.apache.seata.common.XID;
import org.apache.seata.common.holder.ObjectHolder;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
Expand Down Expand Up @@ -61,53 +62,57 @@ public class ServerInstance {
private static final Logger LOGGER = LoggerFactory.getLogger(Server.class);

public void serverInstanceInit() {
if (StringUtils.equals(registryProperties.getType(), NAMING_SERVER)) {
VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager();
EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("scheduledExcuter", 1, true));
ConfigurableEnvironment environment = (ConfigurableEnvironment) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT);

// load node properties
Instance instance = Instance.getInstance();
// load namespace
String namespace = registryNamingServerProperties.getNamespace();
instance.setNamespace(namespace);
// load cluster name
String clusterName = registryNamingServerProperties.getCluster();
instance.setClusterName(clusterName);

// load cluster type
String clusterType = String.valueOf(StoreConfig.getSessionMode());
instance.addMetadata("cluster-type", "raft".equals(clusterType) ? clusterType : "default");

// load unit name
instance.setUnit(String.valueOf(UUID.randomUUID()));

instance.setTerm(System.currentTimeMillis());

// load node Endpoint
instance.setControl(new Node.Endpoint(NetUtil.getLocalIp(), Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port"))), "http"));

// load metadata
for (PropertySource<?> propertySource : environment.getPropertySources()) {
if (propertySource instanceof EnumerablePropertySource) {
EnumerablePropertySource<?> enumerablePropertySource = (EnumerablePropertySource<?>) propertySource;
for (String propertyName : enumerablePropertySource.getPropertyNames()) {
if (propertyName.startsWith(META_PREFIX)) {
instance.addMetadata(propertyName.substring(META_PREFIX.length()), enumerablePropertySource.getProperty(propertyName));
}
VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager();
ConfigurableEnvironment environment =
(ConfigurableEnvironment)ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT);

// load node properties
Instance instance = Instance.getInstance();
// load namespace
String namespace = registryNamingServerProperties.getNamespace();
instance.setNamespace(namespace);
// load cluster name
String clusterName = registryNamingServerProperties.getCluster();
instance.setClusterName(clusterName);

// load cluster type
String clusterType = String.valueOf(StoreConfig.getSessionMode());
instance.addMetadata("cluster-type", "raft".equals(clusterType) ? clusterType : "default");

// load unit name
instance.setUnit(String.valueOf(UUID.randomUUID()));

instance.setTerm(System.currentTimeMillis());

// load node Endpoint
instance.setControl(new Node.Endpoint(NetUtil.getLocalIp(),
Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port"))), "http"));

// load metadata
for (PropertySource<?> propertySource : environment.getPropertySources()) {
if (propertySource instanceof EnumerablePropertySource) {
EnumerablePropertySource<?> enumerablePropertySource = (EnumerablePropertySource<?>)propertySource;
for (String propertyName : enumerablePropertySource.getPropertyNames()) {
if (propertyName.startsWith(META_PREFIX)) {
instance.addMetadata(propertyName.substring(META_PREFIX.length()),
enumerablePropertySource.getProperty(propertyName));
}
}
}
}
instance.setTransaction(new Node.Endpoint(XID.getIpAddress(), XID.getPort(),"netty"));
if (StringUtils.equals(registryProperties.getType(), NAMING_SERVER)) {
// load vgroup mapping relationship
instance.addMetadata("vGroup", vGroupMappingStoreManager.loadVGroups());

EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("scheduledExcuter", 1, true));
EXECUTOR_SERVICE.scheduleAtFixedRate(() -> {
try {
vGroupMappingStoreManager.notifyMapping();
} catch (Exception e) {
LOGGER.error("Naming server register Exception", e);
}
}, registryNamingServerProperties.getHeartbeatPeriod(), registryNamingServerProperties.getHeartbeatPeriod(), TimeUnit.MILLISECONDS);
try {
vGroupMappingStoreManager.notifyMapping();
} catch (Exception e) {
LOGGER.error("Naming server register Exception", e);
}
}, registryNamingServerProperties.getHeartbeatPeriod(), registryNamingServerProperties.getHeartbeatPeriod(),
TimeUnit.MILLISECONDS);
ServerRunner.addDisposable(EXECUTOR_SERVICE::shutdown);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.loader.EnhancedServiceLoader;
import org.apache.seata.common.loader.LoadLevel;
import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.store.MappingDO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import org.apache.seata.common.exception.ErrorCode;
import org.apache.seata.common.exception.SeataRuntimeException;
import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.util.IOUtil;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import org.apache.seata.common.exception.RedisException;
import org.apache.seata.common.loader.LoadLevel;
import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.store.MappingDO;
import org.apache.seata.server.storage.redis.JedisPooledFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.seata.server.store;

import org.apache.seata.common.XID;
import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.core.store.MappingDO;
import org.apache.seata.discovery.registry.MultiRegistryFactory;
import org.apache.seata.discovery.registry.RegistryService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.seata.server.storage.redis.store;

import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.core.store.MappingDO;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand Down

0 comments on commit 1d45946

Please sign in to comment.