001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import org.apache.avro.AvroRemoteException;
020    import org.apache.avro.ipc.NettyTransceiver;
021    import org.apache.avro.ipc.Transceiver;
022    import org.apache.avro.ipc.specific.SpecificRequestor;
023    import org.apache.flume.source.avro.AvroFlumeEvent;
024    import org.apache.flume.source.avro.AvroSourceProtocol;
025    import org.apache.flume.source.avro.Status;
026    import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
027    import org.apache.logging.log4j.core.appender.ManagerFactory;
028    
029    import java.io.IOException;
030    import java.net.InetSocketAddress;
031    import java.nio.ByteBuffer;
032    import java.util.ArrayList;
033    import java.util.HashMap;
034    import java.util.List;
035    import java.util.Map;
036    
037    /**
038     * Manager for FlumeAvroAppenders.
039     */
040    public class FlumeAvroManager extends AbstractFlumeManager {
041    
042        /**
043          The default reconnection delay (500 milliseconds or .5 seconds).
044         */
045        public static final int DEFAULT_RECONNECTION_DELAY   = 500;
046    
047        private static final int DEFAULT_RECONNECTS = 3;
048    
049        private static ManagerFactory factory = new AvroManagerFactory();
050    
051        private AvroSourceProtocol client;
052    
053        private final Agent[] agents;
054    
055        private final int batchSize;
056    
057        private final EventList events = new EventList();
058    
059        private int current = 0;
060    
061        private Transceiver transceiver;
062    
063        /**
064         * Constructor
065         * @param name The unique name of this manager.
066         * @param agents An array of Agents.
067         * @param batchSize The number of evetns to include in a batch.
068         */
069        protected FlumeAvroManager(String name, String shortName, Agent[] agents, int batchSize) {
070            super(name);
071            this.agents = agents;
072            this.batchSize = batchSize;
073            this.client = connect(agents);
074        }
075    
076        /**
077         * Returns a FlumeAvroManager.
078         * @param agents The agents to use.
079         * @param batchSize The number of events to include in a batch.
080         * @return A FlumeAvroManager.
081         */
082        public static FlumeAvroManager getManager(String name, Agent[] agents, int batchSize) {
083            if (agents == null || agents.length == 0) {
084                throw new IllegalArgumentException("At least one agent is required");
085            }
086    
087            if (batchSize <= 0) {
088                batchSize = 1;
089            }
090    
091            StringBuilder sb = new StringBuilder("FlumeAvro[");
092            boolean first = true;
093            for (Agent agent : agents) {
094                if (!first) {
095                    sb.append(",");
096                }
097                sb.append(agent.getHost()).append(":").append(agent.getPort());
098                first = false;
099            }
100            sb.append("]");
101            return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize));
102        }
103    
104        /**
105         * Returns the agents.
106         * @return The agent array.
107         */
108        public Agent[] getAgents() {
109            return agents;
110        }
111    
112        /**
113         * Returns the index of the current agent.
114         * @return The index for the current agent.
115         */
116        public int getCurrent() {
117            return current;
118        }
119    
120        @Override
121        public synchronized void send(FlumeEvent event, int delay, int retries)  {
122            if (delay == 0) {
123                delay = DEFAULT_RECONNECTION_DELAY;
124            }
125            if (retries == 0) {
126                retries = DEFAULT_RECONNECTS;
127            }
128            AvroFlumeEvent avroEvent = new AvroFlumeEvent();
129            avroEvent.body = ByteBuffer.wrap(event.getBody());
130            avroEvent.headers = new HashMap<CharSequence, CharSequence>();
131    
132            for (Map.Entry<String, String> entry : event.getHeaders().entrySet()) {
133              avroEvent.headers.put(entry.getKey(), entry.getValue());
134            }
135    
136            List<AvroFlumeEvent> batch = batchSize > 1 ? events.addAndGet(avroEvent, batchSize) : null;
137            if (batch == null && batchSize > 1) {
138                return;
139            }
140    
141            int i = 0;
142    
143            String msg = "Error writing to " + getName();
144    
145            do {
146                try {
147                    Status status = (batch == null) ? client.append(avroEvent) : client.appendBatch(batch);
148                    if (!status.equals(Status.OK)) {
149                        throw new AvroRemoteException("RPC communication failed to " + agents[current].getHost() +
150                            ":" + agents[current].getPort());
151                    }
152                    return;
153                } catch (Exception ex) {
154                    if (i == retries - 1) {
155                        msg = "Error writing to " + getName() + " at " + agents[current].getHost() + ":" +
156                            agents[current].getPort();
157                        LOGGER.warn(msg, ex);
158                        break;
159                    }
160                    sleep(delay);
161                }
162            } while (++i < retries);
163    
164            for (int index = 0; index < agents.length; ++index) {
165                if (index == current) {
166                    continue;
167                }
168                Agent agent = agents[index];
169                i = 0;
170                do {
171                    try {
172                        transceiver = null;
173                        AvroSourceProtocol c = connect(agent.getHost(), agent.getPort());
174                        Status status = (batch == null) ? c.append(avroEvent) : c.appendBatch(batch);
175                        if (!status.equals(Status.OK)) {
176                            if (i == retries - 1) {
177                                String warnMsg = "RPC communication failed to " + getName() + " at " +
178                                    agent.getHost() + ":" + agent.getPort();
179                                LOGGER.warn(warnMsg);
180                            }
181                            continue;
182                        }
183                        client = c;
184                        current = i;
185                        return;
186                    } catch (Exception ex) {
187                        if (i == retries - 1) {
188                            String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" +
189                                agent.getPort();
190                            LOGGER.warn(warnMsg, ex);
191                            break;
192                        }
193                        sleep(delay);
194                    }
195                } while (++i < retries);
196            }
197    
198            throw new AppenderRuntimeException(msg);
199    
200        }
201    
202        private void sleep(int delay) {
203            try {
204                Thread.sleep(delay);
205            } catch (InterruptedException ex) {
206                Thread.currentThread().interrupt();
207            }
208        }
209    
210        /**
211         * There is a very good chance that this will always return the first agent even if it isn't available.
212         * @param agents The list of agents to choose from
213         * @return The FlumeEventAvroServer.
214         */
215        private AvroSourceProtocol connect(Agent[] agents) {
216            int i = 0;
217            for (Agent agent : agents) {
218                AvroSourceProtocol server = connect(agent.getHost(), agent.getPort());
219                if (server != null) {
220                    current = i;
221                    return server;
222                }
223                ++i;
224            }
225            throw new AppenderRuntimeException("Unable to connect to any agents");
226        }
227    
228        private AvroSourceProtocol connect(String hostname, int port) {
229            try {
230                if (transceiver == null) {
231                    transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port));
232                }
233            } catch (IOException ioe) {
234                LOGGER.error("Unable to create transceiver", ioe);
235                return null;
236            }
237            try {
238                return SpecificRequestor.getClient(AvroSourceProtocol.class, transceiver);
239            } catch (IOException ioe) {
240                LOGGER.error("Unable to create Avro client");
241                return null;
242            }
243        }
244    
245        @Override
246        protected void releaseSub() {
247            if (transceiver != null) {
248                try {
249                    transceiver.close();
250                } catch (IOException ioe) {
251                    LOGGER.error("Attempt to clean up Avro transceiver failed", ioe);
252                }
253            }
254            client = null;
255        }
256    
257        /**
258         * Thread-safe List management of a batch.
259         */
260        private static class EventList extends ArrayList<AvroFlumeEvent> {
261    
262            public synchronized List<AvroFlumeEvent> addAndGet(AvroFlumeEvent event, int batchSize) {
263                super.add(event);
264                if (this.size() >= batchSize) {
265                    List<AvroFlumeEvent> events = new ArrayList<AvroFlumeEvent>();
266                    events.addAll(this);
267                    clear();
268                    return events;
269                } else {
270                    return null;
271                }
272            }
273        }
274    
275        /**
276         * Factory data.
277         */
278        private static class FactoryData {
279            private String name;
280            private Agent[] agents;
281            private int batchSize;
282    
283            /**
284             * Constructor.
285             * @param name The name of the Appender.
286             * @param agents The agents.
287             * @param batchSize The number of events to include in a batch.
288             */
289            public FactoryData(String name, Agent[] agents, int batchSize) {
290                this.name = name;
291                this.agents = agents;
292                this.batchSize = batchSize;
293            }
294        }
295    
296        /**
297         * Avro Manager Factory.
298         */
299        private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
300    
301            /**
302             * Create the FlumeAvroManager.
303             * @param name The name of the entity to manage.
304             * @param data The data required to create the entity.
305             * @return The FlumeAvroManager.
306             */
307            public FlumeAvroManager createManager(String name, FactoryData data) {
308                try {
309    
310                    return new FlumeAvroManager(name, data.name, data.agents, data.batchSize);
311                } catch (Exception ex) {
312                    LOGGER.error("Could not create FlumeAvroManager", ex);
313                }
314                return null;
315            }
316        }
317    
318    }