001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.component.cxf;
019    
020    import org.apache.camel.Processor;
021    import org.apache.camel.impl.DefaultConsumer;
022    import org.apache.cxf.endpoint.ServerImpl;
023    import org.apache.cxf.frontend.ServerFactoryBean;
024    import org.apache.cxf.message.Message;
025    
026    /**
027     * A consumer of exchanges for a service in CXF
028     *
029     * @version $Revision: 534145 $
030     */
031    public class CxfInvokeConsumer extends DefaultConsumer<CxfExchange> {
032        protected CxfInvokeEndpoint cxfEndpoint;
033        private ServerImpl server;
034    
035        public CxfInvokeConsumer(CxfInvokeEndpoint endpoint, Processor processor) {
036            super(endpoint, processor);
037            this.cxfEndpoint = endpoint;
038        }
039    
040        @Override
041        protected void doStart() throws Exception {
042            super.doStart();
043            // TODO we need to add custom cxf message observer and wire the
044            // incomingCxfMessage method.  Also, custom cxf interceptors are
045            // needed in order to object SOAP/XML message.  Currently, the
046            // CXF service invoker will invoke the service class.
047            if (server != null) {
048                // start a cxf service
049                ServerFactoryBean svrBean = new ServerFactoryBean();
050                svrBean.setAddress(getEndpoint().getEndpointUri());
051                svrBean.setServiceClass(Class.forName(cxfEndpoint.getProperty(CxfConstants.IMPL)));
052                svrBean.setBus(cxfEndpoint.getBus());
053    
054                server = (ServerImpl) svrBean.create();
055                server.start();
056            }
057        }
058    
059        @Override
060        protected void doStop() throws Exception {
061            if (server != null) {
062                server.stop();
063                server = null;
064            }
065            super.doStop();
066        }
067    
068        // TODO this method currently is not being called.
069        protected void incomingCxfMessage(Message message) {
070            try {
071                            CxfExchange exchange = cxfEndpoint.createExchange(message);
072                            getProcessor().process(exchange);
073                    } catch (Exception e) {
074                            // TODO: what do do if we are getting processing errors from camel?  Shutdown?
075                            e.printStackTrace();
076                    }
077        }
078    }