@IgniteSpiMultipleInstancesSupport(value=true) @IgniteSpiConsistencyChecked(optional=true) public class JobStealingCollisionSpi extends IgniteSpiAdapter implements CollisionSpi, JobStealingCollisionSpiMBean
The design and ideas for this SPI are significantly influenced by
Java Fork/Join Framework
authored by Doug Lea and planned for Java 7. GridJobStealingCollisionSpi
took
similar concepts and applied them to the grid (as opposed to within VM support planned
in Java 7).
Quite often grids are deployed across many computers some of which will always be more powerful than others. This SPI helps you avoid jobs being stuck at a slower node, as they will be stolen by a faster node. In the following picture when Node3 becomes free, it steals Job13 and Job23 from Node1 and Node2 respectively.
Note that this SPI must always be used in conjunction with
JobStealingFailoverSpi
.
Also note that job metrics update should be enabled in order for this SPI
to work properly (i.e. IgniteConfiguration.getMetricsUpdateFrequency()
should be set to 0
or greater value).
The responsibility of Job Stealing Failover SPI is to properly route stolen
jobs to the nodes that initially requested (stole) these jobs. The
SPI maintains a counter of how many times a jobs was stolen and
hence traveled to another node. GridJobStealingCollisionSpi
checks this counter and will not allow a job to be stolen if this counter
exceeds a certain threshold setMaximumStealingAttempts(int)
.
GridJobStealingCollisionSpi
either from Spring XML file or
directly. The following configuration parameters are supported:
setActiveJobsThreshold(int)
).
setWaitJobsThreshold(int)
).
setMessageExpireTime(long)
).
setMaximumStealingAttempts(int)
).
setStealingEnabled(boolean)
).
setStealingAttributes(Map)
).
GridJobStealingCollisionSpi spi = new GridJobStealingCollisionSpi(); // Configure number of waiting jobs // in the queue for job stealing. spi.setWaitJobsThreshold(10); // Configure message expire time (in milliseconds). spi.setMessageExpireTime(500); // Configure stealing attempts number. spi.setMaximumStealingAttempts(10); // Configure number of active jobs that are allowed to execute // in parallel. This number should usually be equal to the number // of threads in the pool (default is 100). spi.setActiveJobsThreshold(50); // Enable stealing. spi.setStealingEnabled(true); // Set stealing attribute to steal from/to nodes that have it. spi.setStealingAttributes(Collections.singletonMap("node.segment", "foobar")); GridConfiguration cfg = new GridConfiguration(); // Override default Collision SPI. cfg.setCollisionSpi(spi);Here is an example of how this SPI can be configured from Spring XML configuration:
<property name="collisionSpi"> <bean class="org.apache.ignite.spi.collision.jobstealing.GridJobStealingCollisionSpi"> <property name="activeJobsThreshold" value="100"/> <property name="waitJobsThreshold" value="0"/> <property name="messageExpireTime" value="1000"/> <property name="maximumStealingAttempts" value="10"/> <property name="stealingEnabled" value="true"/> <property name="stealingAttributes"> <map> <entry key="node.segment" value="foobar"/> </map> </property> </bean> </property>
For information about Spring framework visit www.springframework.org
Modifier and Type | Field and Description |
---|---|
static String |
ACTIVE_JOBS_THRESHOLD_NODE_ATTR
Threshold of maximum jobs executing concurrently.
|
static int |
DFLT_ACTIVE_JOBS_THRESHOLD
Default number of parallel jobs allowed (value is
95 which is
slightly less same as default value of threads in the execution thread pool
to allow some extra threads for system processing). |
static int |
DFLT_JOB_PRIORITY
Default start value for job priority (value is
0 ). |
static int |
DFLT_MAX_STEALING_ATTEMPTS
Maximum number of attempts to steal job by another node (default is
5 ). |
static long |
DFLT_MSG_EXPIRE_TIME
Default steal message expire time in milliseconds (value is
1000 ). |
static int |
DFLT_WAIT_JOBS_THRESHOLD
Default threshold of waiting jobs.
|
static String |
MAX_STEALING_ATTEMPT_ATTR
Maximum stealing attempts attribute name.
|
static String |
MSG_EXPIRE_TIME_ATTR
Stealing request expiration time attribute name.
|
static String |
STEALING_ATTEMPT_COUNT_ATTR
Name of job context attribute containing current stealing attempt count.
|
static String |
STEALING_PRIORITY_ATTR
Stealing priority attribute name.
|
static String |
THIEF_NODE_ATTR
Job context attribute for storing thief node UUID (this attribute is used in job stealing failover SPI).
|
static String |
WAIT_JOBS_THRESHOLD_NODE_ATTR
Threshold of maximum jobs on waiting queue.
|
ignite
Constructor and Description |
---|
JobStealingCollisionSpi() |
Modifier and Type | Method and Description |
---|---|
int |
getActiveJobsThreshold()
Gets number of jobs that can be executed in parallel.
|
protected List<String> |
getConsistentAttributeNames()
Returns back a list of attributes that should be consistent
for this SPI.
|
int |
getCurrentActiveJobsNumber()
Gets current number of jobs that are being executed.
|
int |
getCurrentHeldJobsNumber()
Gets number of currently
'held' jobs. |
int |
getCurrentJobsToStealNumber()
Gets current number of jobs to be stolen.
|
int |
getCurrentRunningJobsNumber() |
int |
getCurrentWaitJobsNumber()
Gets current number of jobs that wait for the execution.
|
int |
getMaximumStealingAttempts()
Gets maximum number of attempts to steal job by another node.
|
long |
getMessageExpireTime()
Message expire time configuration parameter.
|
Map<String,Object> |
getNodeAttributes()
This method is called before SPI starts (before method
IgniteSpi.spiStart(String)
is called). |
Map<String,? extends Serializable> |
getStealingAttributes()
Configuration parameter to enable stealing to/from only nodes that
have these attributes set (see
ClusterNode.attribute(String) and
IgniteConfiguration.getUserAttributes() methods). |
int |
getTotalStolenJobsNumber()
Gets total number of stolen jobs.
|
int |
getWaitJobsThreshold()
Gets job count threshold at which this node will
start stealing jobs from other nodes.
|
boolean |
isStealingEnabled()
Gets flag indicating whether this node should attempt to steal jobs
from other nodes.
|
void |
onCollision(CollisionContext ctx)
This is a callback called when either new grid job arrived or executing job finished its
execution.
|
void |
onContextDestroyed0()
Method to be called in the beginning of onContextDestroyed() method.
|
protected void |
onContextInitialized0(IgniteSpiContext spiCtx)
Method to be called in the end of onContextInitialized method.
|
void |
setActiveJobsThreshold(int activeJobsThreshold)
Sets number of jobs that can be executed in parallel.
|
void |
setExternalCollisionListener(CollisionExternalListener extLsnr)
Listener to be set for notification of external collision events (e.g. job stealing).
|
void |
setMaximumStealingAttempts(int maxStealingAttempts)
Gets maximum number of attempts to steal job by another node.
|
void |
setMessageExpireTime(long msgExpireTime)
Message expire time configuration parameter.
|
void |
setStealingAttributes(Map<String,? extends Serializable> stealAttrs)
Configuration parameter to enable stealing to/from only nodes that
have these attributes set (see
ClusterNode.attribute(String) and
IgniteConfiguration.getUserAttributes() methods). |
void |
setStealingEnabled(boolean isStealingEnabled)
Gets flag indicating whether this node should attempt to steal jobs
from other nodes.
|
void |
setWaitJobsThreshold(int waitJobsThreshold)
Sets job count threshold at which this node will
start stealing jobs from other nodes.
|
void |
spiStart(String gridName)
This method is called to start SPI.
|
void |
spiStop()
This method is called to stop SPI.
|
String |
toString() |
assertParameter, checkConfigurationConsistency0, configInfo, createSpiAttributeName, getExceptionRegistry, getIgniteHome, getLocalNodeId, getName, getSpiContext, getStartTimestamp, getStartTimestampFormatted, getUpTime, getUpTimeFormatted, injectables, onContextDestroyed, onContextInitialized, registerMBean, setName, startInfo, startStopwatch, stopInfo, unregisterMBean
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getName, onContextDestroyed, onContextInitialized
getIgniteHome, getLocalNodeId, getName, getStartTimestamp, getStartTimestampFormatted, getUpTime, getUpTimeFormatted
public static final int DFLT_MAX_STEALING_ATTEMPTS
5
).public static final int DFLT_ACTIVE_JOBS_THRESHOLD
95
which is
slightly less same as default value of threads in the execution thread pool
to allow some extra threads for system processing).public static final long DFLT_MSG_EXPIRE_TIME
1000
).
Once this time is elapsed and no response for steal message is received,
the message is considered lost and another steal message will be generated,
potentially to another node.public static final int DFLT_WAIT_JOBS_THRESHOLD
0
).public static final int DFLT_JOB_PRIORITY
0
).public static final String THIEF_NODE_ATTR
public static final String WAIT_JOBS_THRESHOLD_NODE_ATTR
public static final String ACTIVE_JOBS_THRESHOLD_NODE_ATTR
public static final String STEALING_ATTEMPT_COUNT_ATTR
ComputeJobContext
,
Constant Field Valuespublic static final String MAX_STEALING_ATTEMPT_ATTR
public static final String MSG_EXPIRE_TIME_ATTR
public static final String STEALING_PRIORITY_ATTR
@IgniteSpiConfiguration(optional=true) public void setActiveJobsThreshold(int activeJobsThreshold)
setActiveJobsThreshold
in interface JobStealingCollisionSpiMBean
activeJobsThreshold
- Number of jobs that can be executed in parallel.public int getActiveJobsThreshold()
getActiveJobsThreshold
in interface JobStealingCollisionSpiMBean
@IgniteSpiConfiguration(optional=true) public void setWaitJobsThreshold(int waitJobsThreshold)
setWaitJobsThreshold
in interface JobStealingCollisionSpiMBean
waitJobsThreshold
- Job count threshold.public int getWaitJobsThreshold()
getWaitJobsThreshold
in interface JobStealingCollisionSpiMBean
@IgniteSpiConfiguration(optional=true) public void setMessageExpireTime(long msgExpireTime)
setMessageExpireTime
in interface JobStealingCollisionSpiMBean
msgExpireTime
- Message expire time.public long getMessageExpireTime()
getMessageExpireTime
in interface JobStealingCollisionSpiMBean
@IgniteSpiConfiguration(optional=true) public void setStealingEnabled(boolean isStealingEnabled)
false
, then this node will steal allow
jobs to be stolen from it, but won't attempt to steal any jobs from
other nodes.
Default value is true
.
setStealingEnabled
in interface JobStealingCollisionSpiMBean
isStealingEnabled
- Flag indicating whether this node should attempt to steal jobs
from other nodes.public boolean isStealingEnabled()
false
, then this node will steal allow
jobs to be stolen from it, but won't attempt to steal any jobs from
other nodes.
Default value is true
.
isStealingEnabled
in interface JobStealingCollisionSpiMBean
@IgniteSpiConfiguration(optional=true) public void setMaximumStealingAttempts(int maxStealingAttempts)
DFLT_MAX_STEALING_ATTEMPTS
value will be used.setMaximumStealingAttempts
in interface JobStealingCollisionSpiMBean
maxStealingAttempts
- Maximum number of attempts to steal job by another node.public int getMaximumStealingAttempts()
DFLT_MAX_STEALING_ATTEMPTS
value will be used.getMaximumStealingAttempts
in interface JobStealingCollisionSpiMBean
@IgniteSpiConfiguration(optional=true) public void setStealingAttributes(Map<String,? extends Serializable> stealAttrs)
ClusterNode.attribute(String)
and
IgniteConfiguration.getUserAttributes()
methods).stealAttrs
- Node attributes to enable job stealing for.public Map<String,? extends Serializable> getStealingAttributes()
ClusterNode.attribute(String)
and
IgniteConfiguration.getUserAttributes()
methods).getStealingAttributes
in interface JobStealingCollisionSpiMBean
public int getCurrentRunningJobsNumber()
getCurrentRunningJobsNumber
in interface JobStealingCollisionSpiMBean
public int getCurrentHeldJobsNumber()
'held'
jobs.getCurrentHeldJobsNumber
in interface JobStealingCollisionSpiMBean
'held'
jobs.public int getCurrentWaitJobsNumber()
getCurrentWaitJobsNumber
in interface JobStealingCollisionSpiMBean
public int getCurrentActiveJobsNumber()
getCurrentActiveJobsNumber
in interface JobStealingCollisionSpiMBean
public int getTotalStolenJobsNumber()
getTotalStolenJobsNumber
in interface JobStealingCollisionSpiMBean
public int getCurrentJobsToStealNumber()
getCurrentJobsToStealNumber
in interface JobStealingCollisionSpiMBean
public Map<String,Object> getNodeAttributes() throws IgniteSpiException
IgniteSpi.spiStart(String)
is called). It allows SPI implementation to add attributes to a local
node. Kernal collects these attributes from all SPI implementations
loaded up and then passes it to discovery SPI so that they can be
exchanged with other nodes.getNodeAttributes
in interface IgniteSpi
getNodeAttributes
in class IgniteSpiAdapter
IgniteSpiException
- Throws in case of any error.public void spiStart(String gridName) throws IgniteSpiException
spiStart
in interface IgniteSpi
gridName
- Name of grid instance this SPI is being started for
(null
for default grid).IgniteSpiException
- Throws in case of any error during SPI start.public void spiStop() throws IgniteSpiException
Note that this method can be called at any point including during recovery of failed start. It should make no assumptions on what state SPI will be in when this method is called.
spiStop
in interface IgniteSpi
IgniteSpiException
- Thrown in case of any error during SPI stop.public void setExternalCollisionListener(CollisionExternalListener extLsnr)
Ignite uses this listener to enable job stealing from overloaded to underloaded nodes.
However, you can also utilize it, for instance, to provide time based collision
resolution. To achieve this, you most likely would mark some job by setting a certain
attribute in job context (see ComputeJobContext
) for a job that requires
time-based scheduling and set some timer in your SPI implementation that would wake up
after a certain period of time. Once this period is reached, you would notify this
listener that a collision resolution should take place. Then inside of your collision
resolution logic, you would find the marked waiting job and activate it.
Note that most collision SPI's might not have external collisions. In that case, they should simply ignore this method and do nothing when listener is set.
setExternalCollisionListener
in interface CollisionSpi
extLsnr
- Listener for external collision events.protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException
onContextInitialized0
in class IgniteSpiAdapter
spiCtx
- SPI context.IgniteSpiException
- In case of errors.public void onContextDestroyed0()
onContextDestroyed0
in class IgniteSpiAdapter
public void onCollision(CollisionContext ctx)
Implementation of this method should act on all lists, each of which contains collision
job contexts that define a set of operations available during collision resolution. Refer
to CollisionContext
and CollisionJobContext
documentation for
more information.
onCollision
in interface CollisionSpi
ctx
- Collision context which contains all collision lists.protected List<String> getConsistentAttributeNames()
getConsistentAttributeNames
in class IgniteSpiAdapter
Follow @ApacheIgnite
Apache Ignite Fabric : ver. 1.0.0 Release Date : March 31 2015