001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.util.concurrent; 018 019 import java.util.concurrent.Callable; 020 import java.util.concurrent.CompletionService; 021 import java.util.concurrent.DelayQueue; 022 import java.util.concurrent.Delayed; 023 import java.util.concurrent.Executor; 024 import java.util.concurrent.Future; 025 import java.util.concurrent.FutureTask; 026 import java.util.concurrent.TimeUnit; 027 import java.util.concurrent.atomic.AtomicInteger; 028 029 /** 030 * A {@link java.util.concurrent.CompletionService} that orders the completed tasks 031 * in the same order as they where submitted. 032 * 033 * @version $Revision: 777657 $ 034 */ 035 public class SubmitOrderedCompletionService<V> implements CompletionService<V> { 036 037 private final Executor executor; 038 039 // the idea to order the completed task in the same order as they where submitted is to leverage 040 // the delay queue. With the delay queue we can control the order by the getDelay and compareTo methods 041 // where we can order the tasks in the same order as they where submitted. 042 private final DelayQueue completionQueue = new DelayQueue(); 043 044 // id is the unique id that determines the order in which tasks was submitted (incrementing) 045 private final AtomicInteger id = new AtomicInteger(); 046 // index is the index of the next id that should expire and thus be ready to take from the delayed queue 047 private final AtomicInteger index = new AtomicInteger(); 048 049 private class SubmitOrderFutureTask<V> extends FutureTask<Void> implements Delayed { 050 051 // the id this task was assigned 052 private final long id; 053 054 public SubmitOrderFutureTask(long id, Callable<Void> voidCallable) { 055 super(voidCallable); 056 this.id = id; 057 } 058 059 public SubmitOrderFutureTask(long id, Runnable runnable, Void result) { 060 super(runnable, result); 061 this.id = id; 062 } 063 064 public long getDelay(TimeUnit unit) { 065 // if the answer is 0 then this task is ready to be taken 066 long answer = id - index.get(); 067 return answer; 068 } 069 070 @SuppressWarnings("unchecked") 071 public int compareTo(Delayed o) { 072 SubmitOrderFutureTask other = (SubmitOrderFutureTask) o; 073 int answer = (int) (this.id - other.id); 074 return answer; 075 } 076 077 @Override 078 @SuppressWarnings("unchecked") 079 protected void done() { 080 // when we are done add to the completion queue 081 completionQueue.add(this); 082 } 083 } 084 085 public SubmitOrderedCompletionService(Executor executor) { 086 this.executor = executor; 087 } 088 089 @SuppressWarnings("unchecked") 090 public Future<V> submit(Callable task) { 091 if (task == null) { 092 throw new IllegalArgumentException("Task must be provided"); 093 } 094 SubmitOrderFutureTask f = new SubmitOrderFutureTask(id.incrementAndGet(), task); 095 executor.execute(f); 096 return f; 097 } 098 099 @SuppressWarnings("unchecked") 100 public Future<V> submit(Runnable task, Object result) { 101 if (task == null) { 102 throw new IllegalArgumentException("Task must be provided"); 103 } 104 SubmitOrderFutureTask f = new SubmitOrderFutureTask(id.incrementAndGet(), task, null); 105 executor.execute(f); 106 return f; 107 } 108 109 @SuppressWarnings("unchecked") 110 public Future<V> take() throws InterruptedException { 111 index.incrementAndGet(); 112 return (Future) completionQueue.take(); 113 } 114 115 @SuppressWarnings("unchecked") 116 public Future<V> poll() { 117 index.incrementAndGet(); 118 return (Future) completionQueue.poll(); 119 } 120 121 @SuppressWarnings("unchecked") 122 public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { 123 index.incrementAndGet(); 124 return (Future) completionQueue.poll(timeout, unit); 125 } 126 }