001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.util.concurrent; 018 019 import java.util.concurrent.TimeUnit; 020 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 021 022 /** 023 * An alternative to a {@link java.util.concurrent.CountDownLatch}. 024 * <p/> 025 * This implementation also supports incrementing the latch count while counting down. 026 * It can also be used to count up to 0 from a negative integer. 027 */ 028 public class CountingLatch { 029 030 @SuppressWarnings("serial") 031 private final class Sync extends AbstractQueuedSynchronizer { 032 033 private Sync() { 034 super(); 035 } 036 037 int getCount() { 038 return getState(); 039 } 040 041 public int tryAcquireShared(int acquires) { 042 return getState() == 0 ? 1 : -1; 043 } 044 045 public boolean tryReleaseShared(int delta) { 046 // Decrement count; signal when transition to zero 047 for (;;) { 048 int c = getState(); 049 int nextc = c + delta; 050 if (compareAndSetState(c, nextc)) { 051 return nextc == 0; 052 } 053 } 054 } 055 } 056 057 private final Sync sync; 058 059 /** 060 * Create a new counting latch (starting count is 0) 061 */ 062 public CountingLatch() { 063 super(); 064 this.sync = new Sync(); 065 } 066 067 /** 068 * Get the current count 069 */ 070 public int getCount() { 071 return sync.getCount(); 072 } 073 074 /** 075 * Increment the count with 1 076 */ 077 public void increment() { 078 sync.releaseShared(+1); 079 } 080 081 /** 082 * Decrement the count with 1 083 */ 084 public void decrement() { 085 sync.releaseShared(-1); 086 } 087 088 /** 089 * Await the latch reaching the count of 0 090 * 091 * @throws InterruptedException if the threads gets interrupted while waiting 092 */ 093 public void await() throws InterruptedException { 094 sync.acquireSharedInterruptibly(1); 095 } 096 097 /** 098 * Wait for a given timeout while checking if the latch reached the count of 0 099 * 100 * @param timeout the value of the timeout 101 * @param unit the unit in which the timeout is expressed 102 * @return <code>true</code> if the latch has reached the count of 0 in the given time 103 * @throws InterruptedException if the thread gets interrupted while waiting 104 */ 105 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 106 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); 107 } 108 109 }