public final class ContinuousQuery<K,V> extends Query<ContinuousQuery<K,V>>
Continuous queries allow to register a remote filter and a local listener for cache updates. If an update event passes the filter, it will be sent to the node that executed the query and local listener will be notified.
Additionally, you can execute initial query to get currently existing data.
Query can be of any type (SQL, TEXT or SCAN) and can be set via setInitialPredicate(Query)
method.
Query can be executed either on all nodes in topology using IgniteCache.query(Query)
method of only on the local node using IgniteCache.localQuery(Query)
method.
Note that in case query is distributed and a new node joins, it will get the remote
filter for the query during discovery process before it actually joins topology,
so no updates will be missed.
To create a new instance of continuous query use Query.continuous()
factory method.
'Person'
objects and we need
to query all persons with salary above 1000.
Here is the Person
class:
public class Person { // Name. private String name; // Salary. private double salary; ... }
You can create and execute continuous query like so:
// Create new continuous query. ContinuousQuery qry = Query.continuous(); // Initial iteration query will return all persons with salary above 1000. qry.setInitialPredicate(Query.scan(new IgniteBiPredicate<UUID, Person>() { @Override public boolean apply(UUID id, Person p) { return p.getSalary() > 1000; } })); // Callback that is called locally when update notifications are received. // It simply prints out information about all created persons. qry.setLocalListener(new CacheEntryUpdatedListener<UUID, Person>() { @Override public void onUpdated(Iterable<CacheEntryEvent<? extends UUID, ? extends Person>> evts) { for (CacheEntryEvent<? extends UUID, ? extends Person> e : evts) { Person p = e.getValue(); X.println(">>>"); X.println(">>> " + p.getFirstName() + " " + p.getLastName() + "'s salary is " + p.getSalary()); X.println(">>>"); } } }); // Continuous listener will be notified for persons with salary above 1000. qry.setRemoteFilter(new CacheEntryEventFilter<UUID, Person>() { @Override public boolean evaluate(CacheEntryEvent<? extends UUID, ? extends Person> e) { return e.getValue().getSalary() > 1000; } }); // Execute query and get cursor that iterates through initial data. QueryCursor<Cache.Entry<UUID, Person>> cur = cache.query(qry);This will execute query on all nodes that have cache you are working with and listener will start to receive notifications for cache updates.
To stop receiving updates call QueryCursor.close()
method:
cur.close();Note that this works even if you didn't provide initial query. Cursor will be empty in this case, but it will still unregister listeners when
QueryCursor.close()
is called.Modifier and Type | Field and Description |
---|---|
static boolean |
DFLT_AUTO_UNSUBSCRIBE
Default value for automatic unsubscription flag.
|
static int |
DFLT_BUF_SIZE
Default buffer size.
|
static long |
DFLT_TIME_INTERVAL
Maximum default time interval after which buffer will be flushed (if buffering is enabled).
|
Constructor and Description |
---|
ContinuousQuery() |
Modifier and Type | Method and Description |
---|---|
int |
getBufferSize()
Gets buffer size.
|
Query |
getInitialPredicate()
Gets initial query.
|
javax.cache.event.CacheEntryUpdatedListener<K,V> |
getLocalListener()
Gets local listener.
|
javax.cache.event.CacheEntryEventFilter<K,V> |
getRemoteFilter()
Gets remote filter.
|
long |
getTimeInterval()
Gets time interval.
|
boolean |
isAutoUnsubscribe()
Gets automatic unsubscription flag value.
|
ContinuousQuery<K,V> |
setAutoUnsubscribe(boolean autoUnsubscribe)
Sets automatic unsubscribe flag.
|
ContinuousQuery<K,V> |
setBufferSize(int bufSize)
Sets buffer size.
|
ContinuousQuery<K,V> |
setInitialPredicate(Query initFilter)
Sets initial query.
|
ContinuousQuery<K,V> |
setLocalListener(javax.cache.event.CacheEntryUpdatedListener<K,V> locLsnr)
Sets local callback.
|
ContinuousQuery<K,V> |
setRemoteFilter(javax.cache.event.CacheEntryEventFilter<K,V> rmtFilter)
Sets optional key-value filter.
|
ContinuousQuery<K,V> |
setTimeInterval(long timeInterval)
Sets time interval.
|
continuous, getPageSize, scan, setPageSize, spi, sql, sql, text, toString
public static final int DFLT_BUF_SIZE
1
means that all entries
will be sent to master node immediately (buffering is disabled).public static final long DFLT_TIME_INTERVAL
public static final boolean DFLT_AUTO_UNSUBSCRIBE
public ContinuousQuery<K,V> setInitialPredicate(Query initFilter)
This query will be executed before continuous listener is registered which allows to iterate through entries which already existed at the time continuous query is executed.
initFilter
- Initial query.this
for chaining.public Query getInitialPredicate()
public ContinuousQuery<K,V> setLocalListener(javax.cache.event.CacheEntryUpdatedListener<K,V> locLsnr)
The callback predicate accepts ID of the node from where updates are received and collection
of received entries. Note that for removed entries value will be null
.
If the predicate returns false
, query execution will be cancelled.
WARNING: all operations that involve any kind of JVM-local or distributed locking (e.g., synchronization or transactional cache operations), should be executed asynchronously without blocking the thread that called the callback. Otherwise, you can get deadlocks.
locLsnr
- Local callback.this
for chaining.public javax.cache.event.CacheEntryUpdatedListener<K,V> getLocalListener()
public ContinuousQuery<K,V> setRemoteFilter(javax.cache.event.CacheEntryEventFilter<K,V> rmtFilter)
WARNING: all operations that involve any kind of JVM-local or distributed locking (e.g., synchronization or transactional cache operations), should be executed asynchronously without blocking the thread that called the filter. Otherwise, you can get deadlocks.
rmtFilter
- Key-value filter.this
for chaining.public javax.cache.event.CacheEntryEventFilter<K,V> getRemoteFilter()
public ContinuousQuery<K,V> setBufferSize(int bufSize)
When a cache update happens, entry is first put into a buffer. Entries from buffer will be
sent to the master node only if the buffer is full or time provided via setTimeInterval(long)
method is
exceeded.
Default buffer size is 1
which means that entries will be sent immediately (buffering is
disabled).
bufSize
- Buffer size.this
for chaining.public int getBufferSize()
public ContinuousQuery<K,V> setTimeInterval(long timeInterval)
When a cache update happens, entry is first put into a buffer. Entries from buffer will
be sent to the master node only if the buffer is full (its size can be provided via setBufferSize(int)
method) or time provided via this method is exceeded.
Default time interval is 0
which means that
time check is disabled and entries will be sent only when buffer is full.
timeInterval
- Time interval.this
for chaining.public long getTimeInterval()
public ContinuousQuery<K,V> setAutoUnsubscribe(boolean autoUnsubscribe)
This flag indicates that query filters on remote nodes should be
automatically unregistered if master node (node that initiated the query) leaves topology. If this flag is
false
, filters will be unregistered only when the query is cancelled from master node, and won't ever be
unregistered if master node leaves grid.
Default value for this flag is true
.
autoUnsubscribe
- Automatic unsubscription flag.this
for chaining.public boolean isAutoUnsubscribe()
Follow @ApacheIgnite
Apache Ignite Fabric : ver. 1.0.0-RC1 Release Date : February 16 2015