001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.component.xmpp;
019    
020    import org.apache.camel.Consumer;
021    import org.apache.camel.Processor;
022    import org.apache.camel.impl.DefaultConsumer;
023    import org.apache.commons.logging.Log;
024    import org.apache.commons.logging.LogFactory;
025    import org.jivesoftware.smack.PacketListener;
026    import org.jivesoftware.smack.packet.Message;
027    import org.jivesoftware.smack.packet.Packet;
028    import org.jivesoftware.smack.packet.RosterPacket;
029    
030    import java.util.Iterator;
031    
032    /**
033     * A {@link Consumer} which listens to XMPP packets
034     *
035     * @version $Revision: 534145 $
036     */
037    public class XmppConsumer extends DefaultConsumer<XmppExchange> implements PacketListener {
038        private static final transient Log log = LogFactory.getLog(XmppConsumer.class);
039        private final XmppEndpoint endpoint;
040    
041        public XmppConsumer(XmppEndpoint endpoint, Processor processor) {
042            super(endpoint, processor);
043            this.endpoint = endpoint;
044        }
045    
046        @Override
047        protected void doStart() throws Exception {
048            super.doStart();
049            endpoint.getConnection().addPacketListener(this, endpoint.getFilter());
050        }
051    
052        @Override
053        protected void doStop() throws Exception {
054            endpoint.getConnection().removePacketListener(this);
055            super.doStop();
056        }
057    
058        public void processPacket(Packet packet) {
059    
060            if (packet instanceof Message) {
061                Message message = (Message) packet;
062                if (log.isDebugEnabled()) {
063                    log.debug("<<<< message: " + message.getBody());
064                }
065                XmppExchange exchange = endpoint.createExchange(message);
066                try {
067                                    getProcessor().process(exchange);
068                            } catch (Exception e) {
069                                    // TODO: what should we do when a processing failure occurs??
070                                    e.printStackTrace();
071                            }
072            }
073            else if (packet instanceof RosterPacket) {
074                RosterPacket rosterPacket = (RosterPacket) packet;
075                if (log.isDebugEnabled()) {
076                    log.debug("Roster packet with : " + rosterPacket.getRosterItemCount() + " item(s)");
077                    Iterator rosterItems = rosterPacket.getRosterItems();
078                    while (rosterItems.hasNext()) {
079                        Object item = rosterItems.next();
080                        log.debug("Roster item: " + item);
081                    }
082                }
083            }
084            else {
085                if (log.isDebugEnabled()) {
086                    log.debug("<<<< ignored packet: " + packet);
087                }
088    
089            }
090        }
091    }