org.knime.core.util
Class MultiThreadWorker<In,Out>

java.lang.Object
  extended by org.knime.core.util.MultiThreadWorker<In,Out>
Type Parameters:
In - The type of input to be processed. The Iterable passed in the run(Iterable) method contains elements of this type. Each element is processed in its own (reusable) thread.
Out - The output type generated by the compute(Object, long) method. An output is derived from a corresponding input.

public abstract class MultiThreadWorker<In,Out>
extends Object

An abstract class to process elements of an Iterable simultaneously. The output of the computation can be any class, whereby it must solely be based on a single input record (due to the parallel processing nature).

The worker threads being used can be either from the global KNIME threadpool or any Executor, which is set using the setExecutor(Executor) method.

The generated output needs to be processed in the (abstract) processFinished(ComputationTask) method, whereby this method is guaranteed to be not called concurrently in different threads (called sequentially for each finished computation, possibly by different worker threads). The order the output arrives is equivalent to the input order. This class uses an internal cache to ensure this ordering; the size of the cache is determined by a constructor argument.

Author:
Bernd Wiswedel, KNIME.com, Zurich, Switzerland

Nested Class Summary
 class MultiThreadWorker.ComputationTask
          Represents a single computation, consists of corresponding input record, input index and the computed output.
 
Constructor Summary
MultiThreadWorker(int maxQueueSize, int maxActiveInstanceSize)
          Creates new worker with a bounded finished job queue and a maximum number of active jobs.
 
Method Summary
protected  void beforeSubmitting(In in, long index)
          Callback for subclasses to be informed about a new task submission.
 void cancel(boolean mayInterruptIfRunning)
          Cancels an ongoing execution.
protected abstract  Out compute(In in, long index)
          Performs the computation for a given input.
 int getActiveCount()
           
 Executor getExecutor()
           
 long getFinishedCount()
           
 int getFinishedTaskCount()
           
 long getSubmittedCount()
           
protected abstract  void processFinished(MultiThreadWorker.ComputationTask task)
          Post-process a finished computation, for instance write a computed result into a file or add a computed row to a data container.
 void run(Iterable<In> inputIterable)
          Main run method to process the input.
 void setExecutor(Executor executor)
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

MultiThreadWorker

public MultiThreadWorker(int maxQueueSize,
                         int maxActiveInstanceSize)
Creates new worker with a bounded finished job queue and a maximum number of active jobs.

Parameters:
maxQueueSize - Maximum queue size of finished jobs (finished computations might be cached in order to ensure the proper output ordering). If this queue is full (because the next-to-be-processed computation is still ongoing), no further tasks are submitted.
maxActiveInstanceSize - The maximum number of simultaneously running computations (unless otherwise bound by the used executor).
Throws:
IllegalArgumentException - if queue size < running instance count
Method Detail

getSubmittedCount

public final long getSubmittedCount()
Returns:
Get the number of already submitted tasks (index of the next-to-be-submitted element in the Iterable). Should only be used for statistics.

getFinishedCount

public final long getFinishedCount()
Returns:
The number of elements that were already processed by the processFinished(ComputationTask) method - used for stats.

getFinishedTaskCount

public final int getFinishedTaskCount()
Returns:
The number of elements currently cached and waiting to be finally processed.

getActiveCount

public final int getActiveCount()
Returns:
Estimate for number of currently active tasks.

run

public void run(Iterable<In> inputIterable)
         throws InterruptedException,
                ExecutionException
Main run method to process the input. This method is to be called only once per instance (subsequent calls will result in an exception).

The method will iterate the input, run each element of the argument iterable in its own thread and return when all elements have been processed (when the last computation has passed the processFinished(ComputationTask) method).

Parameters:
inputIterable - The input elements.
Throws:
InterruptedException - If the main execution has been interrupted (the thread execution the run method is just delegating work and will often wait for resources to become available).
ExecutionException - If an exception is thrown in a worker thread that is not otherwise handled (for instance if processFinished(ComputationTask) throws an exception).
CancellationException - If cancel(boolean) has been called.

setExecutor

public void setExecutor(Executor executor)
Parameters:
executor - the executor to set (null is the default -- it will then use the global KNIME thread pool.

getExecutor

public Executor getExecutor()
Returns:
the executor
See Also:
setExecutor(Executor)

cancel

public void cancel(boolean mayInterruptIfRunning)
Cancels an ongoing execution.

Parameters:
mayInterruptIfRunning - If working (and the main thread executing the run method) may be interrupted.

beforeSubmitting

protected void beforeSubmitting(In in,
                                long index)
                         throws Exception
Callback for subclasses to be informed about a new task submission. This method is called iteratively from the run(Iterable) method.

This default implementation is empty.

Parameters:
in - The element.
index - The index of the element to be submitted.
Throws:
Exception - In case the execution shall be aborted

compute

protected abstract Out compute(In in,
                               long index)
                        throws Exception
Performs the computation for a given input. This method is called concurrently for different input records.

Parameters:
in - The element.
index - The index of the element.
Returns:
The computed output
Throws:
Exception - Any exception, to be handled in the processFinished(ComputationTask) implementation (more specifically in FutureTask.get().

processFinished

protected abstract void processFinished(MultiThreadWorker.ComputationTask task)
                                 throws ExecutionException,
                                        CancellationException,
                                        InterruptedException
Post-process a finished computation, for instance write a computed result into a file or add a computed row to a data container. This method is not called concurrently and the passed MultiThreadWorker.ComputationTask objects come in the order represented by the iterator of the run(Iterable) method.

The result of a computation is to be retrieved using the task's get method. The implementation may want to handle any exception that is possibly thrown by the computation (for instance by logging an error and replacing the result with an appropriate missing value) -- if it's not handled the entire execution will abort with an error being logged.

Parameters:
task - The next task to be finally processed.
Throws:
ExecutionException - If the exception of the Computation is no further handled -- and causes the entire calculation to stop.
CancellationException - If canceled (abort)
InterruptedException - If canceled (abort)


Copyright, 2003 - 2012. All rights reserved.
University of Konstanz, Germany.
Chair for Bioinformatics and Information Mining, Prof. Dr. Michael R. Berthold.
You may not modify, publish, transmit, transfer or sell, reproduce, create derivative works from, distribute, perform, display, or in any way exploit any of the content, in whole or in part, except as otherwise expressly permitted in writing by the copyright owner or as specified in the license file distributed with this product.