Skip to content

Commit

Permalink
Merge branch 'master' of github.com:purplefox/vert.x
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox committed May 17, 2012
2 parents 897e5b3 + 2c7230c commit f3a4076
Show file tree
Hide file tree
Showing 25 changed files with 517 additions and 31 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/vertx/java/core/Vertx.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public abstract class Vertx {
public abstract class Vertx implements VertxMXBean {

private static VertxFactory loadFactory() {
ServiceLoader<VertxFactory> factories = ServiceLoader.load(VertxFactory.class);
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/org/vertx/java/core/VertxMXBean.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.vertx.java.core;

/**
* @author pidster
*
*/
public interface VertxMXBean {

int getBackgroundPoolSize();

int getCorePoolSize();
}
2 changes: 1 addition & 1 deletion src/main/java/org/vertx/java/core/eventbus/EventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public interface EventBus {
public interface EventBus extends EventBusMXBean {

/**
* Send a JSON object as a message
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/org/vertx/java/core/eventbus/EventBusMXBean.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.vertx.java.core.eventbus;

/**
* @author pidster
*
*/
public interface EventBusMXBean {

long getSent();

long getReceived();

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ protected BaseMessage(Buffer readBuff) {
}

protected void write(NetSocket socket) {
int length = 1 + 4 + address.length() + 1 + 4 * sender.host.length() +
int length = 1 + 4 + address.length() + 1 + 4 * sender.getHost().length() +
4 + (replyAddress == null ? 0 : replyAddress.length()) +
getBodyLength();
Buffer totBuff = new Buffer(length);
totBuff.appendInt(0);
totBuff.appendByte(type());
writeString(totBuff, address);
totBuff.appendInt(sender.port);
writeString(totBuff, sender.host);
totBuff.appendInt(sender.getPort());
writeString(totBuff, sender.getHost());
if (replyAddress != null) {
writeString(totBuff, replyAddress);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.vertx.java.core.net.impl.ServerID;
import org.vertx.java.core.parsetools.RecordParser;

import java.beans.ConstructorProperties;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.Map;
Expand All @@ -43,6 +44,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

import javax.management.NotificationBroadcasterSupport;

/**
*
Expand All @@ -52,6 +56,8 @@ public class DefaultEventBus implements EventBus {

private static final Logger log = LoggerFactory.getLogger(DefaultEventBus.class);

private final NotificationBroadcasterSupport notificationBroadcasterSupport = new NotificationBroadcasterSupport();

private static final Buffer PONG = new Buffer(new byte[] { (byte)1 });
private static final long PING_INTERVAL = 20000;
private static final long PING_REPLY_INTERVAL = 20000;
Expand All @@ -64,7 +70,12 @@ public class DefaultEventBus implements EventBus {
private final ConcurrentMap<String, Map<HandlerHolder, String>> handlers = new ConcurrentHashMap<>();
private final Map<String, ServerID> replyAddressCache = new ConcurrentHashMap<>();
private final Map<String, HandlerInfo> handlersByID = new ConcurrentHashMap<>();

private final AtomicLong sent = new AtomicLong(0);

private final AtomicLong received = new AtomicLong(0);

@ConstructorProperties("vertx")
public DefaultEventBus(VertxInternal vertx) {
// Just some dummy server ID
this.vertx = vertx;
Expand All @@ -73,17 +84,29 @@ public DefaultEventBus(VertxInternal vertx) {
this.subs = null;
}

@ConstructorProperties({"vertx", "hostname"})
public DefaultEventBus(VertxInternal vertx, String hostname) {
this(vertx, DEFAULT_CLUSTER_PORT, hostname);
}

@ConstructorProperties({"vertx", "port", "hostname"})
public DefaultEventBus(VertxInternal vertx, int port, String hostname) {
this.vertx = vertx;
this.serverID = new ServerID(port, hostname);
ClusterManager mgr = new HazelcastClusterManager(vertx);
subs = mgr.getSubsMap("subs");
this.subs = mgr.getSubsMap("subs");
this.server = setServer();
}

@Override
public long getSent() {
return sent.get();
}

@Override
public long getReceived() {
return received.get();
}

public void send(String address, JsonObject message, final Handler<Message<JsonObject>> replyHandler) {
send(new JsonMessage(address, message), replyHandler);
Expand Down Expand Up @@ -274,7 +297,7 @@ public void handle(Buffer buff) {
parser.setOutput(handler);
socket.dataHandler(parser);
}
}).listen(serverID.port, serverID.host);
}).listen(serverID.getPort(), serverID.getHost());
}

private void sendToSubs(Collection<ServerID> subs, BaseMessage message) {
Expand Down Expand Up @@ -440,6 +463,7 @@ private void sendRemote(final ServerID serverID, final BaseMessage message) {
holder.connect(client, serverID, message.address);
}
}
sent.incrementAndGet();
holder.writeMessage(message);
}

Expand Down Expand Up @@ -485,6 +509,7 @@ private void receiveMessage(BaseMessage msg) {
replyAddressCache.put(msg.replyAddress, msg.sender);
}
msg.bus = this;
received.incrementAndGet();
final Map<HandlerHolder, String> map = handlers.get(msg.address);
if (map != null) {
boolean replyHandler = false;
Expand Down Expand Up @@ -590,7 +615,7 @@ public void handle(Buffer data) {
}

void connect(NetClient client, final ServerID serverID, final String address) {
client.connect(serverID.port, serverID.host, new Handler<NetSocket>() {
client.connect(serverID.getPort(), serverID.getHost(), new Handler<NetSocket>() {
public void handle(final NetSocket socket) {
connected(socket, address);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public HazelcastServerID(ServerID serverID) {

@Override
public void writeData(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(serverID.port);
dataOutput.writeUTF(serverID.host);
dataOutput.writeInt(serverID.getPort());
dataOutput.writeUTF(serverID.getHost());
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/vertx/java/core/http/HttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public interface HttpServer {
public interface HttpServer extends HttpServerMXBean {

/**
* Set the request handler for the server to {@code requestHandler}. As HTTP requests are received by the server,
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/org/vertx/java/core/http/HttpServerMXBean.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.vertx.java.core.http;

/**
* @author pidster
*
*/
public interface HttpServerMXBean {

int getPort();

String getHost();

boolean isSSL();

Boolean isTCPNoDelay();

Integer getSendBufferSize();

Integer getReceiveBufferSize();

Boolean isTCPKeepAlive();

Boolean isReuseAddress();

Boolean isSoLinger();

Integer getTrafficClass();

Integer getAcceptBacklog();

}
25 changes: 23 additions & 2 deletions src/main/java/org/vertx/java/core/http/impl/DefaultHttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.vertx.java.core.http.impl.ws.hybi17.HandshakeRFC6455;
import org.vertx.java.core.impl.Context;
import org.vertx.java.core.impl.VertxInternal;
import org.vertx.java.core.jmx.JMXUtil;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.core.logging.impl.LoggerFactory;
import org.vertx.java.core.net.impl.HandlerHolder;
Expand All @@ -68,6 +69,8 @@
import org.vertx.java.core.net.impl.VertxWorkerPool;

import javax.net.ssl.SSLEngine;

import java.beans.ConstructorProperties;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
Expand Down Expand Up @@ -109,6 +112,7 @@ public class DefaultHttpServer implements HttpServer {
private HandlerManager<HttpServerRequest> reqHandlerManager = new HandlerManager<>(availableWorkers);
private HandlerManager<ServerWebSocket> wsHandlerManager = new HandlerManager<>(availableWorkers);

@ConstructorProperties("vertx")
public DefaultHttpServer(VertxInternal vertx) {
this.vertx = vertx;
ctx = vertx.getOrAssignContext();
Expand All @@ -123,6 +127,9 @@ public void run() {
}

public HttpServer requestHandler(Handler<HttpServerRequest> requestHandler) {

String simpleName = requestHandler.getClass().getSimpleName();
JMXUtil.register(requestHandler, "org.vertx:type=Handler,type=HTTP,name=%s", simpleName);
this.requestHandler = requestHandler;
return this;
}
Expand All @@ -132,13 +139,27 @@ public Handler<HttpServerRequest> requestHandler() {
}

public HttpServer websocketHandler(Handler<ServerWebSocket> wsHandler) {
String simpleName = requestHandler.getClass().getSimpleName();
JMXUtil.register(requestHandler, "org.vertx:type=Handler,type=WebSocket,name=%s", simpleName);
this.wsHandler = wsHandler;
return this;
}

public Handler<ServerWebSocket> websocketHandler() {
return wsHandler;
}

public ServerID getServerID() {
return id;
}

public int getPort() {
return id.getPort();
}

public String getHost() {
return id.getHost();
}

public HttpServer listen(int port) {
return listen(port, "0.0.0.0");
Expand Down Expand Up @@ -206,7 +227,7 @@ public ChannelPipeline getPipeline() {
} catch (UnknownHostException e) {
log.error("Failed to bind", e);
}
vertx.sharedHttpServers().put(id, this);
vertx.registerSharedHttpServer(id, this);
actualServer = this;
} else {
// Server already exists with that host/port - we will use that
Expand Down Expand Up @@ -385,7 +406,7 @@ public String getTrustStorePassword() {

private void actualClose(final Context closeContext, final Handler<Void> done) {
if (id != null) {
vertx.sharedHttpServers().remove(id);
vertx.unregisterSharedHttpServer(id);
}

for (ServerConnection conn : connectionMap.values()) {
Expand Down
Loading

0 comments on commit f3a4076

Please sign in to comment.