package org.activemq.store.journal;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import javax.jms.JMSException;
import org.activeio.journal.RecordLocation;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.MessageAck;
import org.activemq.service.MessageIdentity;
import org.activemq.service.Transaction;
import org.activemq.service.TransactionManager;
import org.activemq.service.TransactionTask;
import org.activemq.store.MessageStore;
import org.activemq.store.RecoveryListener;
import org.activemq.store.cache.CacheMessageStore;
import org.activemq.store.cache.CacheMessageStoreAware;
import org.activemq.util.Callback;
import org.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:zips/geronimo-tomcat-j2ee-1.1.zip:geronimo-1.1/repository/activemq/activemq-core/3.2.4/activemq-core-3.2.4.jar:org/activemq/store/journal/JournalMessageStore.class
  input_file:zips/geronimo-tomcat-j2ee-1.1.zip:geronimo-1.1/repository/geronimo/activemq/1.1/activemq-1.1.car/rar/activemq-core-3.2.4.jar:org/activemq/store/journal/JournalMessageStore.class
 */
/* loaded from: input_file:zips/geronimo-tomcat-j2ee-1.1.zip:geronimo-1.1/repository/geronimo/ge-activemq-rar/1.1/ge-activemq-rar-1.1.rar:activemq-core-3.2.4.jar:org/activemq/store/journal/JournalMessageStore.class */
public class JournalMessageStore implements MessageStore, CacheMessageStoreAware {
    private static final Log log;
    protected final JournalPersistenceAdapter peristenceAdapter;
    protected final MessageStore longTermStore;
    protected final String destinationName;
    protected final TransactionTemplate transactionTemplate;
    protected RecordLocation lastLocation;
    protected final JournalTransactionStore transactionStore;
    private LinkedHashMap cpAddedMessageIds;
    int removedFromJournal;
    static Class class$org$activemq$store$journal$JournalMessageStore;
    private LinkedHashMap addedMessageIds = new LinkedHashMap();
    private ArrayList removedMessageLocations = new ArrayList();
    protected HashSet inFlightTxLocations = new HashSet();
    private MessageStore cacheMessageStore = this;

    public JournalMessageStore(JournalPersistenceAdapter journalPersistenceAdapter, MessageStore messageStore, String str) {
        this.peristenceAdapter = journalPersistenceAdapter;
        this.transactionStore = this.peristenceAdapter.getTransactionStore();
        this.longTermStore = messageStore;
        this.destinationName = str;
        this.transactionTemplate = new TransactionTemplate(journalPersistenceAdapter);
    }

    @Override // org.activemq.store.MessageStore
    public void addMessage(ActiveMQMessage activeMQMessage) throws JMSException {
        boolean isDebugEnabled = log.isDebugEnabled();
        RecordLocation writePacket = this.peristenceAdapter.writePacket(this.destinationName, activeMQMessage, activeMQMessage.isReceiptRequired());
        if (!TransactionManager.isCurrentTransaction()) {
            if (isDebugEnabled) {
                log.debug(new StringBuffer().append("Journalled message add: ").append(activeMQMessage.getJMSMessageID()).append(" at ").append(writePacket).toString());
            }
            addMessage(activeMQMessage, writePacket);
            return;
        }
        if (isDebugEnabled) {
            log.debug(new StringBuffer().append("Journalled in flight message add: ").append(activeMQMessage.getJMSMessageID()).append(" at ").append(writePacket).toString());
        }
        synchronized (this) {
            this.inFlightTxLocations.add(writePacket);
        }
        Transaction contexTransaction = TransactionManager.getContexTransaction();
        this.transactionStore.addMessage(this, activeMQMessage, writePacket);
        contexTransaction.addPostCommitTask(new TransactionTask(this, isDebugEnabled, activeMQMessage, writePacket) { // from class: org.activemq.store.journal.JournalMessageStore.1
            private final boolean val$debug;
            private final ActiveMQMessage val$message;
            private final RecordLocation val$location;
            private final JournalMessageStore this$0;

            {
                this.this$0 = this;
                this.val$debug = isDebugEnabled;
                this.val$message = activeMQMessage;
                this.val$location = writePacket;
            }

            @Override // org.activemq.service.TransactionTask
            public void execute() throws Throwable {
                if (this.val$debug) {
                    JournalMessageStore.log.debug(new StringBuffer().append("In flight message add commit: ").append(this.val$message.getJMSMessageID()).append(" at ").append(this.val$location).toString());
                }
                synchronized (this.this$0) {
                    this.this$0.inFlightTxLocations.remove(this.val$location);
                    this.this$0.addMessage(this.val$message, this.val$location);
                }
            }
        });
        contexTransaction.addPostRollbackTask(new TransactionTask(this, isDebugEnabled, activeMQMessage, writePacket) { // from class: org.activemq.store.journal.JournalMessageStore.2
            private final boolean val$debug;
            private final ActiveMQMessage val$message;
            private final RecordLocation val$location;
            private final JournalMessageStore this$0;

            {
                this.this$0 = this;
                this.val$debug = isDebugEnabled;
                this.val$message = activeMQMessage;
                this.val$location = writePacket;
            }

            @Override // org.activemq.service.TransactionTask
            public void execute() throws Throwable {
                if (this.val$debug) {
                    JournalMessageStore.log.debug(new StringBuffer().append("In flight message add rollback: ").append(this.val$message.getJMSMessageID()).append(" at ").append(this.val$location).toString());
                }
                synchronized (this.this$0) {
                    this.this$0.inFlightTxLocations.remove(this.val$location);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addMessage(ActiveMQMessage activeMQMessage, RecordLocation recordLocation) {
        synchronized (this) {
            this.lastLocation = recordLocation;
            this.addedMessageIds.put(activeMQMessage.getJMSMessageIdentity(), recordLocation);
        }
    }

    @Override // org.activemq.store.MessageStore
    public void removeMessage(MessageAck messageAck) throws JMSException {
        boolean isDebugEnabled = log.isDebugEnabled();
        RecordLocation writePacket = this.peristenceAdapter.writePacket(this.destinationName, messageAck, messageAck.isReceiptRequired());
        if (!TransactionManager.isCurrentTransaction()) {
            if (isDebugEnabled) {
                log.debug(new StringBuffer().append("Journalled message remove: ").append(messageAck.getMessageID()).append(" at ").append(writePacket).toString());
            }
            removeMessage(messageAck, writePacket);
            return;
        }
        if (isDebugEnabled) {
            log.debug(new StringBuffer().append("Journalled in flight message remove: ").append(messageAck.getMessageID()).append(" at ").append(writePacket).toString());
        }
        synchronized (this) {
            this.inFlightTxLocations.add(writePacket);
        }
        Transaction contexTransaction = TransactionManager.getContexTransaction();
        this.transactionStore.removeMessage(this, messageAck, writePacket);
        contexTransaction.addPostCommitTask(new TransactionTask(this, isDebugEnabled, messageAck, writePacket) { // from class: org.activemq.store.journal.JournalMessageStore.3
            private final boolean val$debug;
            private final MessageAck val$ack;
            private final RecordLocation val$location;
            private final JournalMessageStore this$0;

            {
                this.this$0 = this;
                this.val$debug = isDebugEnabled;
                this.val$ack = messageAck;
                this.val$location = writePacket;
            }

            @Override // org.activemq.service.TransactionTask
            public void execute() throws Throwable {
                if (this.val$debug) {
                    JournalMessageStore.log.debug(new StringBuffer().append("In flight message remove commit: ").append(this.val$ack.getMessageID()).append(" at ").append(this.val$location).toString());
                }
                synchronized (this.this$0) {
                    this.this$0.inFlightTxLocations.remove(this.val$location);
                    this.this$0.removeMessage(this.val$ack, this.val$location);
                }
            }
        });
        contexTransaction.addPostRollbackTask(new TransactionTask(this, isDebugEnabled, messageAck, writePacket) { // from class: org.activemq.store.journal.JournalMessageStore.4
            private final boolean val$debug;
            private final MessageAck val$ack;
            private final RecordLocation val$location;
            private final JournalMessageStore this$0;

            {
                this.this$0 = this;
                this.val$debug = isDebugEnabled;
                this.val$ack = messageAck;
                this.val$location = writePacket;
            }

            @Override // org.activemq.service.TransactionTask
            public void execute() throws Throwable {
                if (this.val$debug) {
                    JournalMessageStore.log.debug(new StringBuffer().append("In flight message remove rollback: ").append(this.val$ack.getMessageID()).append(" at ").append(this.val$location).toString());
                }
                synchronized (this.this$0) {
                    this.this$0.inFlightTxLocations.remove(this.val$location);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeMessage(MessageAck messageAck, RecordLocation recordLocation) {
        synchronized (this) {
            this.lastLocation = recordLocation;
            if (((RecordLocation) this.addedMessageIds.remove(messageAck.getMessageIdentity())) == null) {
                this.removedMessageLocations.add(messageAck);
            } else {
                this.removedFromJournal++;
            }
        }
    }

    public RecordLocation checkpoint() throws JMSException {
        ArrayList arrayList;
        ArrayList arrayList2;
        synchronized (this) {
            this.cpAddedMessageIds = this.addedMessageIds;
            arrayList = this.removedMessageLocations;
            this.inFlightTxLocations.removeAll(this.removedMessageLocations);
            this.inFlightTxLocations.removeAll(this.addedMessageIds.values());
            arrayList2 = new ArrayList(this.inFlightTxLocations);
            this.addedMessageIds = new LinkedHashMap();
            this.removedMessageLocations = new ArrayList();
            log.debug(new StringBuffer().append("removedFromJournal=").append(this.removedFromJournal).toString());
            this.removedFromJournal = 0;
        }
        boolean isDebugEnabled = log.isDebugEnabled();
        if (isDebugEnabled) {
            log.debug(new StringBuffer().append("Checkpoint: ").append(this.destinationName).toString());
        }
        int[] iArr = {0};
        int[] iArr2 = {0};
        this.transactionTemplate.run(new Callback(this, isDebugEnabled, iArr, arrayList, iArr2) { // from class: org.activemq.store.journal.JournalMessageStore.5
            private final boolean val$debug;
            private final int[] val$messagesAdded;
            private final ArrayList val$cpRemovedMessageLocations;
            private final int[] val$messagesRemoved;
            private final JournalMessageStore this$0;

            {
                this.this$0 = this;
                this.val$debug = isDebugEnabled;
                this.val$messagesAdded = iArr;
                this.val$cpRemovedMessageLocations = arrayList;
                this.val$messagesRemoved = iArr2;
            }

            @Override // org.activemq.util.Callback
            public void execute() throws Throwable {
                for (MessageIdentity messageIdentity : this.this$0.cpAddedMessageIds.keySet()) {
                    if (this.val$debug) {
                        JournalMessageStore.log.debug(new StringBuffer().append("Adding: ").append(messageIdentity.getMessageID()).toString());
                    }
                    ActiveMQMessage cacheMessage = this.this$0.getCacheMessage(messageIdentity);
                    if (cacheMessage == null) {
                        cacheMessage = (ActiveMQMessage) this.this$0.peristenceAdapter.readPacket((RecordLocation) this.this$0.cpAddedMessageIds.get(messageIdentity));
                    }
                    if (cacheMessage != null) {
                        try {
                            this.this$0.longTermStore.addMessage(cacheMessage);
                            int[] iArr3 = this.val$messagesAdded;
                            iArr3[0] = iArr3[0] + 1;
                        } catch (Throwable th) {
                            JournalMessageStore.log.warn(new StringBuffer().append("Message could not be added to long term store: ").append(th.getMessage()).toString(), th);
                        }
                    } else {
                        JournalMessageStore.log.warn(new StringBuffer().append("Journal could not reload message: ").append(messageIdentity).toString());
                    }
                }
                Iterator it = this.val$cpRemovedMessageLocations.iterator();
                while (it.hasNext()) {
                    try {
                        MessageAck messageAck = (MessageAck) it.next();
                        if (this.val$debug) {
                            JournalMessageStore.log.debug(new StringBuffer().append("Removing: ").append(messageAck.getMessageID()).toString());
                        }
                        this.this$0.longTermStore.removeMessage(messageAck);
                        int[] iArr4 = this.val$messagesRemoved;
                        iArr4[0] = iArr4[0] + 1;
                    } catch (Throwable th2) {
                        JournalMessageStore.log.debug(new StringBuffer().append("Message could not be removed from long term store: ").append(th2.getMessage()).toString(), th2);
                    }
                }
            }
        });
        log.debug(new StringBuffer().append("Added ").append(iArr[0]).append(" message(s) and removed ").append(iArr2[0]).append(" message(s). removedFromJournal=").append(this.removedFromJournal).toString());
        synchronized (this) {
            this.cpAddedMessageIds = null;
        }
        Collections.sort(arrayList2);
        if (isDebugEnabled) {
            log.debug(new StringBuffer().append("In flight journal locations: ").append(arrayList2).toString());
        }
        return arrayList2.size() > 0 ? (RecordLocation) arrayList2.get(0) : this.lastLocation;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ActiveMQMessage getCacheMessage(MessageIdentity messageIdentity) throws JMSException {
        return this.cacheMessageStore.getMessage(messageIdentity);
    }

    @Override // org.activemq.store.MessageStore
    public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException {
        Object obj;
        synchronized (this) {
            obj = this.addedMessageIds.get(messageIdentity);
            if (obj == null && this.cpAddedMessageIds != null) {
                obj = this.cpAddedMessageIds.get(messageIdentity);
            }
        }
        if (obj != null) {
            try {
                ActiveMQMessage activeMQMessage = (ActiveMQMessage) this.peristenceAdapter.readPacket((RecordLocation) obj);
                if (activeMQMessage != null) {
                    return activeMQMessage;
                }
            } catch (Throwable th) {
            }
        }
        return this.longTermStore.getMessage(messageIdentity);
    }

    @Override // org.activemq.store.MessageStore
    public void recover(RecoveryListener recoveryListener) throws JMSException {
        this.peristenceAdapter.checkpoint(true);
        this.longTermStore.recover(recoveryListener);
    }

    @Override // org.activemq.service.Service
    public void start() throws JMSException {
        this.longTermStore.start();
    }

    @Override // org.activemq.service.Service
    public void stop() throws JMSException {
        this.longTermStore.stop();
    }

    public MessageStore getLongTermMessageStore() {
        return this.longTermStore;
    }

    @Override // org.activemq.store.cache.CacheMessageStoreAware
    public void setCacheMessageStore(CacheMessageStore cacheMessageStore) {
        this.cacheMessageStore = cacheMessageStore;
        if (this.longTermStore instanceof CacheMessageStoreAware) {
            ((CacheMessageStoreAware) this.longTermStore).setCacheMessageStore(cacheMessageStore);
        }
    }

    @Override // org.activemq.store.MessageStore
    public void removeAllMessages() throws JMSException {
        this.peristenceAdapter.checkpoint(true);
        this.longTermStore.removeAllMessages();
    }

    public void replayAddMessage(ActiveMQMessage activeMQMessage) {
        try {
            if (this.longTermStore.getMessage(activeMQMessage.getJMSMessageIdentity()) == null) {
                this.longTermStore.addMessage(activeMQMessage);
            }
        } catch (Throwable th) {
            log.debug(new StringBuffer().append("Could not replay add for message '").append(activeMQMessage.getJMSMessageIdentity().getMessageID()).append("'.  Message may have already been added. reason: ").append(th).toString());
        }
    }

    public void replayRemoveMessage(MessageAck messageAck) {
        try {
            if (this.longTermStore.getMessage(messageAck.getMessageIdentity()) != null) {
                this.longTermStore.removeMessage(messageAck);
            }
        } catch (Throwable th) {
            log.debug(new StringBuffer().append("Could not replay acknowledge for message '").append(messageAck.getMessageID()).append("'.  Message may have already been acknowledged. reason: ").append(th).toString());
        }
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$activemq$store$journal$JournalMessageStore == null) {
            cls = class$("org.activemq.store.journal.JournalMessageStore");
            class$org$activemq$store$journal$JournalMessageStore = cls;
        } else {
            cls = class$org$activemq$store$journal$JournalMessageStore;
        }
        log = LogFactory.getLog(cls);
    }
}
