diff --git a/bundles/org.jupnp/src/main/java/org/jupnp/DefaultUpnpServiceConfiguration.java b/bundles/org.jupnp/src/main/java/org/jupnp/DefaultUpnpServiceConfiguration.java index d7a031772..ca1c255ed 100644 --- a/bundles/org.jupnp/src/main/java/org/jupnp/DefaultUpnpServiceConfiguration.java +++ b/bundles/org.jupnp/src/main/java/org/jupnp/DefaultUpnpServiceConfiguration.java @@ -22,8 +22,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + import org.jupnp.binding.xml.DeviceDescriptorBinder; import org.jupnp.binding.xml.ServiceDescriptorBinder; import org.jupnp.binding.xml.UDA10DeviceDescriptorBinderImpl; @@ -53,6 +52,8 @@ import org.jupnp.transport.spi.StreamClient; import org.jupnp.transport.spi.StreamServer; import org.jupnp.util.Exceptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Default configuration data of a typical UPnP stack. @@ -88,7 +89,7 @@ public class DefaultUpnpServiceConfiguration implements UpnpServiceConfiguration // set a fairly large core threadpool size, expecting that core timeout policy will // allow the pool to reduce in size after inactivity. note that ThreadPoolExecutor - // only adds threads beyond its core size once the backlog is full, so a low value + // only adds threads beyond its core size once the backlog is full, so a low value // core size is a poor choice when there are lots of long-running + idle jobs. // a brief intro to the issue: // http://www.bigsoft.co.uk/blog/2009/11/27/rules-of-a-threadpoolexecutor-pool-size @@ -130,7 +131,8 @@ public DefaultUpnpServiceConfiguration(int streamListenPort, int multicastRespon } protected DefaultUpnpServiceConfiguration(boolean checkRuntime) { - this(NetworkAddressFactoryImpl.DEFAULT_TCP_HTTP_LISTEN_PORT, NetworkAddressFactoryImpl.DEFAULT_MULTICAST_RESPONSE_LISTEN_PORT, checkRuntime); + this(NetworkAddressFactoryImpl.DEFAULT_TCP_HTTP_LISTEN_PORT, + NetworkAddressFactoryImpl.DEFAULT_MULTICAST_RESPONSE_LISTEN_PORT, checkRuntime); } protected DefaultUpnpServiceConfiguration(int streamListenPort, int multicastResponsePort, boolean checkRuntime) { @@ -173,7 +175,7 @@ public GENAEventProcessor getGenaEventProcessor() { @Override @SuppressWarnings("rawtypes") public StreamClient createStreamClient() { - return transportConfiguration.createStreamClient(getSyncProtocolExecutorService()); + return transportConfiguration.createStreamClient(getSyncProtocolExecutorService(), -1); } @Override @@ -184,12 +186,8 @@ public StreamServer createStreamServer(NetworkAddressFactory networkAddressFacto @Override public MulticastReceiver createMulticastReceiver(NetworkAddressFactory networkAddressFactory) { - return new MulticastReceiverImpl( - new MulticastReceiverConfigurationImpl( - networkAddressFactory.getMulticastGroup(), - networkAddressFactory.getMulticastPort() - ) - ); + return new MulticastReceiverImpl(new MulticastReceiverConfigurationImpl( + networkAddressFactory.getMulticastGroup(), networkAddressFactory.getMulticastPort())); } @Override @@ -296,6 +294,11 @@ public NetworkAddressFactory createNetworkAddressFactory() { return createNetworkAddressFactory(streamListenPort, multicastResponsePort); } + @Override + public int getMaxRequests() { + return -1; + } + @Override public void shutdown() { log.trace("Shutting down default executor service"); @@ -341,29 +344,22 @@ protected ExecutorService createDefaultExecutorService() { public static class JUPnPExecutor extends ThreadPoolExecutor { public JUPnPExecutor() { - this(new JUPnPThreadFactory(), - new ThreadPoolExecutor.DiscardPolicy() { - // The pool is bounded and rejections will happen during shutdown - @Override - public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) { - // Log and discard - LoggerFactory.getLogger(DefaultUpnpServiceConfiguration.class).warn("Thread pool rejected execution of " + runnable.getClass()); - super.rejectedExecution(runnable, threadPoolExecutor); - } - } - ); + this(new JUPnPThreadFactory(), new ThreadPoolExecutor.DiscardPolicy() { + // The pool is bounded and rejections will happen during shutdown + @Override + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) { + // Log and discard + LoggerFactory.getLogger(DefaultUpnpServiceConfiguration.class) + .warn("Thread pool rejected execution of " + runnable.getClass()); + super.rejectedExecution(runnable, threadPoolExecutor); + } + }); } public JUPnPExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedHandler) { // This is the same as Executors.newCachedThreadPool - super(CORE_THREAD_POOL_SIZE, - THREAD_POOL_SIZE, - 10L, - TimeUnit.SECONDS, - new ArrayBlockingQueue(THREAD_QUEUE_SIZE), - threadFactory, - rejectedHandler - ); + super(CORE_THREAD_POOL_SIZE, THREAD_POOL_SIZE, 10L, TimeUnit.SECONDS, + new ArrayBlockingQueue(THREAD_QUEUE_SIZE), threadFactory, rejectedHandler); allowCoreThreadTimeOut(THREAD_POOL_CORE_TIMEOUT); } @@ -379,7 +375,8 @@ protected void afterExecute(Runnable runnable, Throwable throwable) { return; } // Log only - LoggerFactory.getLogger(DefaultUpnpServiceConfiguration.class).warn("Thread terminated " + runnable + " abruptly with exception: " + throwable); + LoggerFactory.getLogger(DefaultUpnpServiceConfiguration.class) + .warn("Thread terminated " + runnable + " abruptly with exception: " + throwable); LoggerFactory.getLogger(DefaultUpnpServiceConfiguration.class).warn("Root cause: " + cause); } } @@ -399,15 +396,13 @@ public JUPnPThreadFactory() { @Override public Thread newThread(Runnable r) { - Thread t = new Thread( - group, r, - namePrefix + threadNumber.getAndIncrement(), - 0 - ); - if (t.isDaemon()) + Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); + if (t.isDaemon()) { t.setDaemon(false); - if (t.getPriority() != Thread.NORM_PRIORITY) + } + if (t.getPriority() != Thread.NORM_PRIORITY) { t.setPriority(Thread.NORM_PRIORITY); + } return t; } diff --git a/bundles/org.jupnp/src/main/java/org/jupnp/OSGiUpnpServiceConfiguration.java b/bundles/org.jupnp/src/main/java/org/jupnp/OSGiUpnpServiceConfiguration.java index 938ca3ab6..b64456dfa 100644 --- a/bundles/org.jupnp/src/main/java/org/jupnp/OSGiUpnpServiceConfiguration.java +++ b/bundles/org.jupnp/src/main/java/org/jupnp/OSGiUpnpServiceConfiguration.java @@ -90,6 +90,8 @@ public class OSGiUpnpServiceConfiguration implements UpnpServiceConfiguration { private int httpProxyPort = -1; private int streamListenPort = 8080; private Namespace callbackURI = new Namespace("http://localhost/upnpcallback"); + private int retryAfterSeconds = -1; + private int maxRequests = -1; private ExecutorService mainExecutorService; private ExecutorService asyncExecutorService; @@ -146,7 +148,6 @@ protected OSGiUpnpServiceConfiguration(int streamListenPort, int multicastRespon } protected void activate(BundleContext context, Map configProps) throws ConfigurationException { - this.context = context; createConfiguration(configProps); @@ -193,7 +194,7 @@ public GENAEventProcessor getGenaEventProcessor() { @Override @SuppressWarnings("rawtypes") public StreamClient createStreamClient() { - return transportConfiguration.createStreamClient(getSyncProtocolExecutorService()); + return transportConfiguration.createStreamClient(getSyncProtocolExecutorService(), retryAfterSeconds); } @Override @@ -333,6 +334,11 @@ public NetworkAddressFactory createNetworkAddressFactory() { return createNetworkAddressFactory(streamListenPort, multicastResponsePort); } + @Override + public int getMaxRequests() { + return maxRequests; + } + @Override public void shutdown() { log.debug("Shutting down executor services"); @@ -472,6 +478,30 @@ private void createConfiguration(Map properties) throws Configur } else if (prop instanceof Integer) { httpProxyPort = (Integer) prop; } + + prop = properties.get("retryAfterSeconds"); + if (prop instanceof String) { + try { + retryAfterSeconds = Integer.valueOf((String) prop); + } catch (NumberFormatException e) { + log.error("Invalid value '{}' for retryAfterSeconds - using default value", prop); + } + } else if (prop instanceof Integer) { + retryAfterSeconds = (Integer) prop; + } + log.info("OSGiUpnpServiceConfiguration createConfiguration retryAfterSeconds = {}", retryAfterSeconds); + + prop = properties.get("maxRequests"); + if (prop instanceof String) { + try { + maxRequests = Integer.valueOf((String) prop); + } catch (NumberFormatException e) { + log.error("Invalid value '{}' for maxRequests - using default value", prop); + } + } else if (prop instanceof Integer) { + maxRequests = (Integer) prop; + } + log.info("OSGiUpnpServiceConfiguration createConfiguration maxRequests = {}", maxRequests); } } diff --git a/bundles/org.jupnp/src/main/java/org/jupnp/UpnpServiceConfiguration.java b/bundles/org.jupnp/src/main/java/org/jupnp/UpnpServiceConfiguration.java index da5f3b679..12fd53b35 100644 --- a/bundles/org.jupnp/src/main/java/org/jupnp/UpnpServiceConfiguration.java +++ b/bundles/org.jupnp/src/main/java/org/jupnp/UpnpServiceConfiguration.java @@ -108,12 +108,14 @@ public interface UpnpServiceConfiguration { public ExecutorService getStreamServerExecutorService(); /** - * @return The shared implementation of {@link org.jupnp.binding.xml.DeviceDescriptorBinder} for the UPnP 1.0 Device Architecture.. + * @return The shared implementation of {@link org.jupnp.binding.xml.DeviceDescriptorBinder} for the UPnP 1.0 Device + * Architecture.. */ public DeviceDescriptorBinder getDeviceDescriptorBinderUDA10(); /** - * @return The shared implementation of {@link org.jupnp.binding.xml.ServiceDescriptorBinder} for the UPnP 1.0 Device Architecture.. + * @return The shared implementation of {@link org.jupnp.binding.xml.ServiceDescriptorBinder} for the UPnP 1.0 + * Device Architecture.. */ public ServiceDescriptorBinder getServiceDescriptorBinderUDA10(); @@ -142,7 +144,7 @@ public interface UpnpServiceConfiguration { * @return The time in milliseconds to wait between each registry maintenance operation. */ public int getRegistryMaintenanceIntervalMillis(); - + /** * Optional setting for flooding alive NOTIFY messages for local devices. *

@@ -228,7 +230,8 @@ public interface UpnpServiceConfiguration { public Executor getAsyncProtocolExecutor(); /** - * @return The executor service which runs the processing of synchronous aspects of the UPnP stack (description, control, GENA). + * @return The executor service which runs the processing of synchronous aspects of the UPnP stack (description, + * control, GENA). */ public ExecutorService getSyncProtocolExecutorService(); @@ -247,6 +250,8 @@ public interface UpnpServiceConfiguration { */ public Executor getRegistryListenerExecutor(); + public int getMaxRequests(); + /** * Called by the {@link org.jupnp.UpnpService} on shutdown, useful to e.g. shutdown thread pools. */ diff --git a/bundles/org.jupnp/src/main/java/org/jupnp/UpnpServiceImpl.java b/bundles/org.jupnp/src/main/java/org/jupnp/UpnpServiceImpl.java index f398292f6..2be176dce 100644 --- a/bundles/org.jupnp/src/main/java/org/jupnp/UpnpServiceImpl.java +++ b/bundles/org.jupnp/src/main/java/org/jupnp/UpnpServiceImpl.java @@ -47,7 +47,7 @@ *

* Override the various create...() methods to customize instantiation of protocol factory, router, etc. *

- * + * * @author Christian Bauer * @author Kai Kreuzer - OSGiified the service */ @@ -71,6 +71,7 @@ public class UpnpServiceImpl implements UpnpService { protected ScheduledExecutorService scheduledExecutorService; protected volatile ScheduledFuture scheduledFuture; + protected volatile ScheduledFuture routerScheduledFuture; public UpnpServiceImpl() { } @@ -80,11 +81,11 @@ public UpnpServiceImpl(UpnpServiceConfiguration configuration) { } private static ScheduledExecutorService createExecutor() { - return Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + return Executors.newScheduledThreadPool(2, new ThreadFactory() { @Override public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "Upnp Service Delayed Startup Thread"); + Thread thread = new Thread(runnable, "Upnp Service Thread"); thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() { @Override @@ -140,26 +141,32 @@ protected ControlPoint createControlPoint(ProtocolFactory protocolFactory, Regis return new ControlPointImpl(getConfiguration(), protocolFactory, registry); } + @Override public UpnpServiceConfiguration getConfiguration() { return configuration; } + @Override public ControlPoint getControlPoint() { return controlPoint; } + @Override public ProtocolFactory getProtocolFactory() { return protocolFactory; } + @Override public Registry getRegistry() { return registry; } + @Override public Router getRouter() { return router; } + @Override public synchronized void shutdown() { shutdown(false); } @@ -171,6 +178,10 @@ public void run() { synchronized (lock) { if (isRunning) { log.info("Shutting down UPnP service..."); + if (routerScheduledFuture != null) { + routerScheduledFuture.cancel(true); + routerScheduledFuture = null; + } shutdownRegistry(); shutdownConfiguration(); shutdownRouter(); @@ -241,6 +252,7 @@ public void run() { scheduledFuture = scheduledExecutorService.schedule(startup, msDelay, TimeUnit.MILLISECONDS); } + @Override public void startup() { synchronized (lock) { if (!isRunning) { @@ -262,6 +274,11 @@ public void startup() { this.controlPoint = createControlPoint(protocolFactory, registry); + if (getConfiguration().getMaxRequests() > 0) { + log.debug("Schedule a job checking if the network router needs to be restarted"); + scheduleRouterAutoRestart(); + } + log.debug("UPnP service started successfully"); isRunning = true; @@ -273,6 +290,29 @@ public void startup() { } } + private void scheduleRouterAutoRestart() { + + Runnable autoRestart = new Runnable() { + @Override + public void run() { + log.debug("Checking if the network router needs to be restarted..."); + try { + if (getRouter().autoRestart()) { + log.info("UPnP network router has been restarted"); + } + } catch (RouterException ex) { + log.debug("Exception while restarting the network router: " + ex.getMessage()); + } + } + }; + + if (routerScheduledFuture != null) { + routerScheduledFuture.cancel(true); + } + + routerScheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(autoRestart, 5, 5, TimeUnit.MINUTES); + } + private void setConfigProperties(Map configProperties) { Object prop = configProperties.get("initialSearchEnabled"); if (prop instanceof Boolean) { @@ -282,6 +322,7 @@ private void setConfigProperties(Map configProperties) { protected void activate(Map configProperties) { scheduledFuture = null; + routerScheduledFuture = null; scheduledExecutorService = createExecutor(); setConfigProperties(configProperties); startup(); @@ -291,6 +332,9 @@ protected void deactivate() { if (scheduledFuture != null) { scheduledFuture.cancel(true); } + if (routerScheduledFuture != null) { + routerScheduledFuture.cancel(true); + } scheduledExecutorService.shutdownNow(); shutdown(); diff --git a/bundles/org.jupnp/src/main/java/org/jupnp/controlpoint/ActionCallback.java b/bundles/org.jupnp/src/main/java/org/jupnp/controlpoint/ActionCallback.java index 4bd1808ba..03652f780 100644 --- a/bundles/org.jupnp/src/main/java/org/jupnp/controlpoint/ActionCallback.java +++ b/bundles/org.jupnp/src/main/java/org/jupnp/controlpoint/ActionCallback.java @@ -14,6 +14,8 @@ package org.jupnp.controlpoint; +import java.net.URL; + import org.jupnp.model.action.ActionException; import org.jupnp.model.action.ActionInvocation; import org.jupnp.model.message.UpnpResponse; @@ -21,16 +23,16 @@ import org.jupnp.model.meta.LocalService; import org.jupnp.model.meta.RemoteService; import org.jupnp.model.meta.Service; -import org.jupnp.model.types.ErrorCode; import org.jupnp.protocol.sync.SendingAction; - -import java.net.URL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Execute actions on any service. *

* Usage example for asynchronous execution in a background thread: *

+ * *
  * Service service = device.findService(new UDAServiceId("SwitchPower"));
  * Action getStatusAction = service.getAction("GetStatus");
@@ -56,6 +58,7 @@
  * You can also execute the action synchronously in the same thread using the
  * {@link org.jupnp.controlpoint.ActionCallback.Default} implementation:
  * 

+ * *
  * myActionInvocation.setInput("foo", bar);
  * new ActionCallback.Default(myActionInvocation, upnpService.getControlPoint()).run();
@@ -71,6 +74,7 @@ public abstract class ActionCallback implements Runnable {
      * execution of an {@link org.jupnp.model.action.ActionInvocation}.
      */
     public static final class Default extends ActionCallback {
+        final private Logger log = LoggerFactory.getLogger(Default.class);
 
         public Default(ActionInvocation actionInvocation, ControlPoint controlPoint) {
             super(actionInvocation, controlPoint);
@@ -82,7 +86,7 @@ public void success(ActionInvocation invocation) {
 
         @Override
         public void failure(ActionInvocation invocation, UpnpResponse operation, String defaultMsg) {
-
+            log.debug("Default ActionCallback: {} failed: {}", invocation.getAction().getName(), defaultMsg);
         }
     }
 
@@ -112,12 +116,13 @@ synchronized public ActionCallback setControlPoint(ControlPoint controlPoint) {
         return this;
     }
 
+    @Override
     public void run() {
         Service service = actionInvocation.getAction().getService();
 
         // Local execution
         if (service instanceof LocalService) {
-            LocalService localService = (LocalService)service;
+            LocalService localService = (LocalService) service;
 
             // Executor validates input inside the execute() call immediately
             localService.getExecutor(actionInvocation.getAction()).execute(actionInvocation);
@@ -128,26 +133,33 @@ public void run() {
                 success(actionInvocation);
             }
 
-        // Remote execution
-        } else if (service instanceof RemoteService){
+            // Remote execution
+        } else if (service instanceof RemoteService) {
 
-            if (getControlPoint()  == null) {
+            if (getControlPoint() == null) {
                 throw new IllegalStateException("Callback must be executed through ControlPoint");
             }
 
-            RemoteService remoteService = (RemoteService)service;
+            RemoteService remoteService = (RemoteService) service;
 
             // Figure out the remote URL where we'd like to send the action request to
             URL controLURL;
             try {
-            	controLURL = remoteService.getDevice().normalizeURI(remoteService.getControlURI());
-            } catch(IllegalArgumentException e) {
-            	failure(actionInvocation, null, "bad control URL: " + remoteService.getControlURI());
-            	return ;
+                controLURL = remoteService.getDevice().normalizeURI(remoteService.getControlURI());
+            } catch (IllegalArgumentException e) {
+                failure(actionInvocation, null, "bad control URL: " + remoteService.getControlURI());
+                return;
+            }
+
+            // Ignored HTTPS control URL
+            if ("https".equals(controLURL.getProtocol())) {
+                failure(actionInvocation, null, "ignored action due to HTTPS control URL: " + controLURL);
+                return;
             }
 
             // Do it
-            SendingAction prot = getControlPoint().getProtocolFactory().createSendingAction(actionInvocation, controLURL);
+            SendingAction prot = getControlPoint().getProtocolFactory().createSendingAction(actionInvocation,
+                    controLURL);
             prot.run();
 
             IncomingActionResponseMessage response = prot.getOutputMessage();
diff --git a/bundles/org.jupnp/src/main/java/org/jupnp/mock/MockRouter.java b/bundles/org.jupnp/src/main/java/org/jupnp/mock/MockRouter.java
index daa8212a9..07d97d58b 100644
--- a/bundles/org.jupnp/src/main/java/org/jupnp/mock/MockRouter.java
+++ b/bundles/org.jupnp/src/main/java/org/jupnp/mock/MockRouter.java
@@ -56,8 +56,7 @@ public class MockRouter implements Router {
     protected UpnpServiceConfiguration configuration;
     protected ProtocolFactory protocolFactory;
 
-    public MockRouter(UpnpServiceConfiguration configuration,
-                      ProtocolFactory protocolFactory) {
+    public MockRouter(UpnpServiceConfiguration configuration, ProtocolFactory protocolFactory) {
         this.configuration = configuration;
         this.protocolFactory = protocolFactory;
     }
@@ -91,6 +90,11 @@ public boolean isEnabled() throws RouterException {
         return false;
     }
 
+    @Override
+    public boolean autoRestart() throws RouterException {
+        return false;
+    }
+
     @Override
     public void handleStartFailure(InitializationException ex) throws InitializationException {
     }
@@ -100,37 +104,37 @@ public List getActiveStreamServers(InetAddress preferredAddress)
         // Simulate an active stream server, otherwise the notification/search response
         // protocols won't even run
         try {
-            return Arrays.asList(
-                new NetworkAddress(
-                    InetAddress.getByName("127.0.0.1"),
-                    NetworkAddressFactoryImpl.DEFAULT_TCP_HTTP_LISTEN_PORT
-                )
-            );
+            return Arrays.asList(new NetworkAddress(InetAddress.getByName("127.0.0.1"),
+                    NetworkAddressFactoryImpl.DEFAULT_TCP_HTTP_LISTEN_PORT));
         } catch (UnknownHostException ex) {
             throw new RuntimeException(ex);
         }
     }
 
+    @Override
     public void received(IncomingDatagramMessage msg) {
         incomingDatagramMessages.add(msg);
     }
 
+    @Override
     public void received(UpnpStream stream) {
         receivedUpnpStreams.add(stream);
     }
 
+    @Override
     public void send(OutgoingDatagramMessage msg) throws RouterException {
         outgoingDatagramMessages.add(msg);
     }
 
+    @Override
     public StreamResponseMessage send(StreamRequestMessage msg) throws RouterException {
         sentStreamRequestMessages.add(msg);
         counter++;
-        return getStreamResponseMessages() != null
-            ? getStreamResponseMessages()[counter]
-            : getStreamResponseMessage(msg);
+        return getStreamResponseMessages() != null ? getStreamResponseMessages()[counter]
+                : getStreamResponseMessage(msg);
     }
 
+    @Override
     public void broadcast(byte[] bytes) {
         broadcastedBytes.add(bytes);
     }
diff --git a/bundles/org.jupnp/src/main/java/org/jupnp/transport/Router.java b/bundles/org.jupnp/src/main/java/org/jupnp/transport/Router.java
index f8e175fc3..297d37786 100644
--- a/bundles/org.jupnp/src/main/java/org/jupnp/transport/Router.java
+++ b/bundles/org.jupnp/src/main/java/org/jupnp/transport/Router.java
@@ -14,6 +14,9 @@
 
 package org.jupnp.transport;
 
+import java.net.InetAddress;
+import java.util.List;
+
 import org.jupnp.UpnpServiceConfiguration;
 import org.jupnp.model.NetworkAddress;
 import org.jupnp.model.message.IncomingDatagramMessage;
@@ -24,9 +27,6 @@
 import org.jupnp.transport.spi.InitializationException;
 import org.jupnp.transport.spi.UpnpStream;
 
-import java.net.InetAddress;
-import java.util.List;
-
 /**
  * Interface of the network transport layer.
  * 

@@ -80,7 +80,7 @@ public interface Router { /** * Disables the router and releases all other resources. */ - void shutdown() throws RouterException ; + void shutdown() throws RouterException; /** * @@ -88,6 +88,8 @@ public interface Router { */ boolean isEnabled() throws RouterException; + boolean autoRestart() throws RouterException; + /** * Called by the {@link #enable()} method before it returns. * @@ -112,6 +114,7 @@ public interface Router { * the execution completes, the calling thread should be free to handle the next reception as * soon as possible. *

+ * * @param msg The received datagram message. */ public void received(IncomingDatagramMessage msg); @@ -125,6 +128,7 @@ public interface Router { * should be free to process the next reception as soon as possible. Typically this means starting * a new thread of execution in this method. *

+ * * @param stream */ public void received(UpnpStream stream); @@ -133,6 +137,7 @@ public interface Router { *

* Call this method to send a UDP datagram message. *

+ * * @param msg The UDP datagram message to send. * @throws RouterException if a recoverable error, such as thread interruption, occurs. */ @@ -142,6 +147,7 @@ public interface Router { *

* Call this method to send a TCP (HTTP) stream message. *

+ * * @param msg The TCP (HTTP) stream message to send. * @return The response received from the server. * @throws RouterException if a recoverable error, such as thread interruption, occurs. @@ -152,6 +158,7 @@ public interface Router { *

* Call this method to broadcast a UDP message to all hosts on the network. *

+ * * @param bytes The byte payload of the UDP datagram. * @throws RouterException if a recoverable error, such as thread interruption, occurs. */ diff --git a/bundles/org.jupnp/src/main/java/org/jupnp/transport/RouterImpl.java b/bundles/org.jupnp/src/main/java/org/jupnp/transport/RouterImpl.java index 7ceeead4a..7280bca14 100644 --- a/bundles/org.jupnp/src/main/java/org/jupnp/transport/RouterImpl.java +++ b/bundles/org.jupnp/src/main/java/org/jupnp/transport/RouterImpl.java @@ -77,11 +77,13 @@ public class RouterImpl implements Router { protected final Map datagramIOs = new HashMap(); protected final Map streamServers = new HashMap(); + private int nbfSentRequests; + protected RouterImpl() { } /** - * @param configuration The configuration used by this router. + * @param configuration The configuration used by this router. * @param protocolFactory The protocol factory used by this router. */ public RouterImpl(UpnpServiceConfiguration configuration, ProtocolFactory protocolFactory) { @@ -97,10 +99,12 @@ public boolean disable(DisableRouter event) throws RouterException { return disable(); } + @Override public UpnpServiceConfiguration getConfiguration() { return configuration; } + @Override public ProtocolFactory getProtocolFactory() { return protocolFactory; } @@ -127,13 +131,13 @@ public boolean enable() throws RouterException { // The transports possibly removed some unusable network interfaces/addresses if (!networkAddressFactory.hasUsableNetwork()) { throw new NoNetworkException( - "No usable network interface and/or addresses available, check the log for errors." - ); + "No usable network interface and/or addresses available, check the log for errors."); } // Start the HTTP client last, we don't even have to try if there is no network streamClient = getConfiguration().createStreamClient(); + nbfSentRequests = 0; enabled = true; return true; } catch (InitializationException ex) { @@ -197,6 +201,18 @@ public boolean isEnabled() { return enabled; } + @Override + public boolean autoRestart() throws RouterException { + int max = getConfiguration().getMaxRequests(); + if (enabled && max > 0 && nbfSentRequests >= max) { + log.info("Restarting the UPnP network router..."); + disable(); + enable(); + return true; + } + return false; + } + @Override public void handleStartFailure(InitializationException ex) throws InitializationException { if (ex instanceof NoNetworkException) { @@ -207,6 +223,7 @@ public void handleStartFailure(InitializationException ex) throws Initialization } } + @Override public List getActiveStreamServers(InetAddress preferredAddress) throws RouterException { lock(readLock); try { @@ -214,24 +231,18 @@ public List getActiveStreamServers(InetAddress preferredAddress) List streamServerAddresses = new ArrayList(); StreamServer preferredServer; - if (preferredAddress != null && - (preferredServer = streamServers.get(preferredAddress)) != null) { - streamServerAddresses.add( - new NetworkAddress( - preferredAddress, - preferredServer.getPort(), + if (preferredAddress != null && (preferredServer = streamServers.get(preferredAddress)) != null) { + streamServerAddresses.add(new NetworkAddress(preferredAddress, preferredServer.getPort(), networkAddressFactory.getHardwareAddress(preferredAddress) - ) - ); + )); return streamServerAddresses; } for (Map.Entry entry : streamServers.entrySet()) { byte[] hardwareAddress = networkAddressFactory.getHardwareAddress(entry.getKey()); - streamServerAddresses.add( - new NetworkAddress(entry.getKey(), entry.getValue().getPort(), hardwareAddress) - ); + streamServerAddresses + .add(new NetworkAddress(entry.getKey(), entry.getValue().getPort(), hardwareAddress)); } return streamServerAddresses; } else { @@ -254,6 +265,7 @@ public List getActiveStreamServers(InetAddress preferredAddress) * * @param msg The received datagram message. */ + @Override public void received(IncomingDatagramMessage msg) { if (!enabled) { log.debug("Router disabled, ignoring incoming message: " + msg); @@ -278,6 +290,7 @@ public void received(IncomingDatagramMessage msg) { * * @param stream The received {@link org.jupnp.transport.spi.UpnpStream}. */ + @Override public void received(UpnpStream stream) { if (!enabled) { log.debug("Router disabled, ignoring incoming: " + stream); @@ -292,6 +305,7 @@ public void received(UpnpStream stream) { * * @param msg The UDP datagram message to send. */ + @Override public void send(OutgoingDatagramMessage msg) throws RouterException { lock(readLock); try { @@ -314,6 +328,7 @@ public void send(OutgoingDatagramMessage msg) throws RouterException { * @return The return value of the {@link org.jupnp.transport.spi.StreamClient#sendRequest(StreamRequestMessage)} * method or null if no StreamClient is available. */ + @Override public StreamResponseMessage send(StreamRequestMessage msg) throws RouterException { lock(readLock); try { @@ -322,7 +337,8 @@ public StreamResponseMessage send(StreamRequestMessage msg) throws RouterExcepti log.debug("No StreamClient available, not sending: " + msg); return null; } - log.debug("Sending via TCP unicast stream: " + msg); + nbfSentRequests++; + log.debug("Sending via TCP unicast stream (request number {}): {}", nbfSentRequests, msg); try { return streamClient.sendRequest(msg); } catch (InterruptedException ex) { @@ -346,6 +362,7 @@ public StreamResponseMessage send(StreamRequestMessage msg) throws RouterExcepti * * @param bytes The byte payload of the UDP datagram. */ + @Override public void broadcast(byte[] bytes) throws RouterException { lock(readLock); try { @@ -377,26 +394,23 @@ protected void startInterfaceBasedTransports(Iterator interfac } else { try { log.debug("Init multicast receiver on interface: " + networkInterface.getDisplayName()); - multicastReceiver.init( - networkInterface, - this, - networkAddressFactory, - getConfiguration().getDatagramProcessor() - ); + multicastReceiver.init(networkInterface, this, networkAddressFactory, + getConfiguration().getDatagramProcessor()); multicastReceivers.put(networkInterface, multicastReceiver); } catch (InitializationException ex) { - /* TODO: What are some recoverable exceptions for this? - log.warn( - "Ignoring network interface '" - + networkInterface.getDisplayName() - + "' init failure of MulticastReceiver: " + ex.toString()); - if (log.isTraceEnabled()) - log.log(Level.FINE, "Initialization exception root cause", Exceptions.unwrap(ex)); - log.warn("Removing unusable interface " + interface); - it.remove(); - continue; // Don't need to try anything else on this interface - */ + /* + * TODO: What are some recoverable exceptions for this? + * log.warn( + * "Ignoring network interface '" + * + networkInterface.getDisplayName() + * + "' init failure of MulticastReceiver: " + ex.toString()); + * if (log.isTraceEnabled()) + * log.log(Level.FINE, "Initialization exception root cause", Exceptions.unwrap(ex)); + * log.warn("Removing unusable interface " + interface); + * it.remove(); + * continue; // Don't need to try anything else on this interface + */ throw ex; } } @@ -426,7 +440,7 @@ protected void startAddressBasedTransports(Iterator addresses) thro Throwable cause = Exceptions.unwrap(ex); if (cause instanceof BindException) { log.warn("Failed to init StreamServer: " + cause); - log.debug("Initialization exception root cause", cause); + log.debug("Initialization exception root cause", cause); log.warn("Removing unusable address: " + address); addresses.remove(); continue; // Don't try anything else with this address @@ -442,20 +456,22 @@ protected void startAddressBasedTransports(Iterator addresses) thro } else { try { log.debug("Init datagram I/O on address: " + address); - datagramIO.init(address, networkAddressFactory.getMulticastResponsePort(), this, getConfiguration().getDatagramProcessor()); + datagramIO.init(address, networkAddressFactory.getMulticastResponsePort(), this, + getConfiguration().getDatagramProcessor()); datagramIOs.put(address, datagramIO); } catch (InitializationException ex) { - /* TODO: What are some recoverable exceptions for this? - Throwable cause = Exceptions.unwrap(ex); - if (cause instanceof BindException) { - log.warn("Failed to init datagram I/O: " + cause); - if (log.isTraceEnabled()) - log.log(Level.FINE, "Initialization exception root cause", cause); - log.warn("Removing unusable address: " + address); - addresses.remove(); - continue; // Don't try anything else with this address - } - */ + /* + * TODO: What are some recoverable exceptions for this? + * Throwable cause = Exceptions.unwrap(ex); + * if (cause instanceof BindException) { + * log.warn("Failed to init datagram I/O: " + cause); + * if (log.isTraceEnabled()) + * log.log(Level.FINE, "Initialization exception root cause", cause); + * log.warn("Removing unusable address: " + address); + * addresses.remove(); + * continue; // Don't try anything else with this address + * } + */ throw ex; } } @@ -474,19 +490,17 @@ protected void startAddressBasedTransports(Iterator addresses) thro protected void lock(Lock lock, int timeoutMilliseconds) throws RouterException { try { - log.trace("Trying to obtain lock with timeout milliseconds '" + timeoutMilliseconds + "': " + lock.getClass().getSimpleName()); + log.trace("Trying to obtain lock with timeout milliseconds '" + timeoutMilliseconds + "': " + + lock.getClass().getSimpleName()); if (lock.tryLock(timeoutMilliseconds, TimeUnit.MILLISECONDS)) { log.trace("Acquired router lock: " + lock.getClass().getSimpleName()); } else { - throw new RouterException( - "Router wasn't available exclusively after waiting " + timeoutMilliseconds + "ms, lock failed: " - + lock.getClass().getSimpleName() - ); + throw new RouterException("Router wasn't available exclusively after waiting " + timeoutMilliseconds + + "ms, lock failed: " + lock.getClass().getSimpleName()); } } catch (InterruptedException ex) { throw new RouterException( - "Interruption while waiting for exclusive access: " + lock.getClass().getSimpleName(), ex - ); + "Interruption while waiting for exclusive access: " + lock.getClass().getSimpleName(), ex); } } diff --git a/bundles/org.jupnp/src/main/java/org/jupnp/transport/TransportConfiguration.java b/bundles/org.jupnp/src/main/java/org/jupnp/transport/TransportConfiguration.java index d7ad01fb1..9196cfa0c 100644 --- a/bundles/org.jupnp/src/main/java/org/jupnp/transport/TransportConfiguration.java +++ b/bundles/org.jupnp/src/main/java/org/jupnp/transport/TransportConfiguration.java @@ -20,7 +20,6 @@ import org.jupnp.transport.spi.StreamServer; import org.jupnp.transport.spi.StreamServerConfiguration; - /** * Interface to abstract a transport implementation. * @@ -35,9 +34,11 @@ public interface TransportConfiguration createStreamClient(final ExecutorService executorService); + StreamClient createStreamClient(final ExecutorService executorService, int retryAfterSeconds); /** * Creates a {@link StreamServer} using the given {@code listenerPort}. diff --git a/bundles/org.jupnp/src/main/java/org/jupnp/transport/impl/jetty/JettyTransportConfiguration.java b/bundles/org.jupnp/src/main/java/org/jupnp/transport/impl/jetty/JettyTransportConfiguration.java index d7783a0e5..4be0c077d 100644 --- a/bundles/org.jupnp/src/main/java/org/jupnp/transport/impl/jetty/JettyTransportConfiguration.java +++ b/bundles/org.jupnp/src/main/java/org/jupnp/transport/impl/jetty/JettyTransportConfiguration.java @@ -13,28 +13,23 @@ * * @author Victor Toni - initial contribution */ -public class JettyTransportConfiguration - implements TransportConfiguration { +public class JettyTransportConfiguration implements TransportConfiguration { public static final TransportConfiguration INSTANCE = new JettyTransportConfiguration(); @Override - public StreamClient createStreamClient(final ExecutorService executorService) { - return new JettyStreamClientImpl( - new StreamClientConfigurationImpl( - executorService - ) - ); + public StreamClient createStreamClient(final ExecutorService executorService, int retryAfterSeconds) { + StreamClientConfigurationImpl clientConfiguration = new StreamClientConfigurationImpl(executorService); + if (retryAfterSeconds >= 0) { + clientConfiguration.setRetryAfterSeconds(retryAfterSeconds); + } + return new JettyStreamClientImpl(clientConfiguration); } @Override public StreamServer createStreamServer(final int listenerPort) { return new ServletStreamServerImpl( - new ServletStreamServerConfigurationImpl( - JettyServletContainer.INSTANCE, - listenerPort - ) - ); + new ServletStreamServerConfigurationImpl(JettyServletContainer.INSTANCE, listenerPort)); } } diff --git a/bundles/org.jupnp/src/main/java/org/jupnp/transport/spi/AbstractStreamClient.java b/bundles/org.jupnp/src/main/java/org/jupnp/transport/spi/AbstractStreamClient.java index 44bbb26a2..5a91c6e53 100644 --- a/bundles/org.jupnp/src/main/java/org/jupnp/transport/spi/AbstractStreamClient.java +++ b/bundles/org.jupnp/src/main/java/org/jupnp/transport/spi/AbstractStreamClient.java @@ -32,7 +32,7 @@ /** * Implements the timeout/callback processing and unifies exception handling. - * + * * @author Christian Bauer */ public abstract class AbstractStreamClient implements StreamClient { @@ -51,8 +51,7 @@ public StreamResponseMessage sendRequest(StreamRequestMessage requestMessage) th final Long previeousFailureTime = failedRequests.get(requestMessage.getUri()); if (getConfiguration().getRetryAfterSeconds() > 0 && previeousFailureTime != null) { - if (start - previeousFailureTime < TimeUnit.SECONDS - .toNanos(getConfiguration().getRetryAfterSeconds())) { + if (start - previeousFailureTime < TimeUnit.SECONDS.toNanos(getConfiguration().getRetryAfterSeconds())) { log.debug("Will not attempt request because it failed in the last {} seconds: {}", getConfiguration().getRetryAfterSeconds(), requestMessage); return null; @@ -62,8 +61,9 @@ public StreamResponseMessage sendRequest(StreamRequestMessage requestMessage) th } REQUEST request = createRequest(requestMessage); - if (request == null) + if (request == null) { return null; + } Callable callable = createCallable(requestMessage, request); RequestWrapper requestWrapper = new RequestWrapper(callable); @@ -73,7 +73,8 @@ public StreamResponseMessage sendRequest(StreamRequestMessage requestMessage) th // Wait on the current thread for completion try { - log.trace("Waiting {} seconds for HTTP request to complete: {}", getConfiguration().getTimeoutSeconds(), requestMessage); + log.trace("Waiting {} seconds for HTTP request to complete: {}", getConfiguration().getTimeoutSeconds(), + requestMessage); StreamResponseMessage response = future.get(getConfiguration().getTimeoutSeconds(), TimeUnit.SECONDS); // Log a warning if it took too long @@ -105,12 +106,12 @@ public StreamResponseMessage sendRequest(StreamRequestMessage requestMessage) th if (!logExecutionException(cause)) { String message = "HTTP request failed: " + requestMessage; - if (log.isDebugEnabled()) { + if (log.isDebugEnabled() || log.isTraceEnabled()) { // if debug then the warning will additionally contain the stacktrace of the causing exception - log.warn(message, Exceptions.unwrap(cause)); + log.warn(message + " ({})", cause.getMessage(), cause); } else { // compact logging - log.warn(message + " (" + Exceptions.unwrap(cause).getMessage() + ")"); + log.warn(message + " ({})", Exceptions.unwrap(cause).getMessage()); } } @@ -188,7 +189,7 @@ private void cleanOldFailedRequests(long currentTime) { } } - // Wrap the Callables to track if execution started or if it timed out while waiting in the executor queue + // Wrap the Callables to track if execution started or if it timed out while waiting in the executor queue private static class RequestWrapper implements Callable { Callable task;