001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import org.apache.avro.AvroRemoteException;
020    import org.apache.avro.ipc.NettyTransceiver;
021    import org.apache.avro.ipc.Transceiver;
022    import org.apache.avro.ipc.specific.SpecificRequestor;
023    import org.apache.flume.source.avro.AvroFlumeEvent;
024    import org.apache.flume.source.avro.AvroSourceProtocol;
025    import org.apache.flume.source.avro.Status;
026    import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
027    import org.apache.logging.log4j.core.appender.ManagerFactory;
028    
029    import java.io.IOException;
030    import java.net.InetSocketAddress;
031    import java.nio.ByteBuffer;
032    import java.util.ArrayList;
033    import java.util.HashMap;
034    import java.util.List;
035    import java.util.Map;
036    
037    /**
038     * Manager for FlumeAvroAppenders.
039     */
040    public class FlumeAvroManager extends AbstractFlumeManager {
041    
042        /**
043          The default reconnection delay (500 milliseconds or .5 seconds).
044         */
045        public static final int DEFAULT_RECONNECTION_DELAY   = 500;
046    
047        private static final int DEFAULT_RECONNECTS = 3;
048    
049        private static ManagerFactory factory = new AvroManagerFactory();
050    
051        private AvroSourceProtocol client;
052    
053        private final Agent[] agents;
054    
055        private final int batchSize;
056    
057        private final EventList events = new EventList();
058    
059        private int current = 0;
060    
061        private Transceiver transceiver;
062    
063        /**
064         * Constructor
065         * @param name The unique name of this manager.
066         * @param agents An array of Agents.
067         * @param batchSize The number of evetns to include in a batch.
068         */
069        protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize) {
070            super(name);
071            this.agents = agents;
072            this.batchSize = batchSize;
073            this.client = connect(agents);
074        }
075    
076        /**
077         * Returns a FlumeAvroManager.
078         * @param name The name of the manager.
079         * @param agents The agents to use.
080         * @param batchSize The number of events to include in a batch.
081         * @return A FlumeAvroManager.
082         */
083        public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize) {
084            if (agents == null || agents.length == 0) {
085                throw new IllegalArgumentException("At least one agent is required");
086            }
087    
088            if (batchSize <= 0) {
089                batchSize = 1;
090            }
091    
092            final StringBuilder sb = new StringBuilder("FlumeAvro[");
093            boolean first = true;
094            for (final Agent agent : agents) {
095                if (!first) {
096                    sb.append(",");
097                }
098                sb.append(agent.getHost()).append(":").append(agent.getPort());
099                first = false;
100            }
101            sb.append("]");
102            return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize));
103        }
104    
105        /**
106         * Returns the agents.
107         * @return The agent array.
108         */
109        public Agent[] getAgents() {
110            return agents;
111        }
112    
113        /**
114         * Returns the index of the current agent.
115         * @return The index for the current agent.
116         */
117        public int getCurrent() {
118            return current;
119        }
120    
121        @Override
122        public synchronized void send(final FlumeEvent event, int delay, int retries)  {
123            if (delay == 0) {
124                delay = DEFAULT_RECONNECTION_DELAY;
125            }
126            if (retries == 0) {
127                retries = DEFAULT_RECONNECTS;
128            }
129            if (client == null) {
130                client = connect(agents);
131            }
132            String msg = "No Flume agents are available";
133            if (client != null) {
134                final AvroFlumeEvent avroEvent = new AvroFlumeEvent();
135                avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
136                avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>());
137    
138                for (final Map.Entry<String, String> entry : event.getHeaders().entrySet()) {
139                    avroEvent.getHeaders().put(entry.getKey(), entry.getValue());
140                }
141    
142                final List<AvroFlumeEvent> batch = batchSize > 1 ? events.addAndGet(avroEvent, batchSize) : null;
143                if (batch == null && batchSize > 1) {
144                    return;
145                }
146    
147                int i = 0;
148    
149                msg = "Error writing to " + getName();
150    
151                do {
152                    try {
153                        final Status status = (batch == null) ? client.append(avroEvent) : client.appendBatch(batch);
154                        if (!status.equals(Status.OK)) {
155                            throw new AvroRemoteException("RPC communication failed to " + agents[current].getHost() +
156                                ":" + agents[current].getPort());
157                        }
158                        return;
159                    } catch (final Exception ex) {
160                        if (i == retries - 1) {
161                            msg = "Error writing to " + getName() + " at " + agents[current].getHost() + ":" +
162                                agents[current].getPort();
163                            LOGGER.warn(msg, ex);
164                            break;
165                        }
166                        sleep(delay);
167                    }
168                } while (++i < retries);
169    
170                for (int index = 0; index < agents.length; ++index) {
171                    if (index == current) {
172                        continue;
173                    }
174                    final Agent agent = agents[index];
175                    i = 0;
176                    do {
177                        try {
178                            transceiver = null;
179                            final AvroSourceProtocol c = connect(agent.getHost(), agent.getPort());
180                            final Status status = (batch == null) ? c.append(avroEvent) : c.appendBatch(batch);
181                            if (!status.equals(Status.OK)) {
182                                if (i == retries - 1) {
183                                    final String warnMsg = "RPC communication failed to " + getName() + " at " +
184                                        agent.getHost() + ":" + agent.getPort();
185                                    LOGGER.warn(warnMsg);
186                                }
187                                continue;
188                            }
189                            client = c;
190                            current = i;
191                            return;
192                        } catch (final Exception ex) {
193                            if (i == retries - 1) {
194                                final String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" +
195                                    agent.getPort();
196                                LOGGER.warn(warnMsg, ex);
197                                break;
198                            }
199                            sleep(delay);
200                        }
201                    } while (++i < retries);
202                }
203            }
204    
205            throw new AppenderRuntimeException(msg);
206    
207        }
208    
209        private void sleep(final int delay) {
210            try {
211                Thread.sleep(delay);
212            } catch (final InterruptedException ex) {
213                Thread.currentThread().interrupt();
214            }
215        }
216    
217        /**
218         * There is a very good chance that this will always return the first agent even if it isn't available.
219         * @param agents The list of agents to choose from
220         * @return The FlumeEventAvroServer.
221         */
222        private AvroSourceProtocol connect(final Agent[] agents) {
223            int i = 0;
224            for (final Agent agent : agents) {
225                final AvroSourceProtocol server = connect(agent.getHost(), agent.getPort());
226                if (server != null) {
227                    current = i;
228                    return server;
229                }
230                ++i;
231            }
232            LOGGER.error("Flume manager " + getName() + " was unable to connect to any agents");
233            return null;
234        }
235    
236        private AvroSourceProtocol connect(final String hostname, final int port) {
237            try {
238                if (transceiver == null) {
239                    transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port));
240                }
241            } catch (final IOException ioe) {
242                LOGGER.error("Unable to create transceiver", ioe);
243                return null;
244            }
245            try {
246                return SpecificRequestor.getClient(AvroSourceProtocol.class, transceiver);
247            } catch (final IOException ioe) {
248                LOGGER.error("Unable to create Avro client");
249                return null;
250            }
251        }
252    
253        @Override
254        protected void releaseSub() {
255            if (transceiver != null) {
256                try {
257                    transceiver.close();
258                } catch (final IOException ioe) {
259                    LOGGER.error("Attempt to clean up Avro transceiver failed", ioe);
260                }
261            }
262            client = null;
263        }
264    
265        /**
266         * Thread-safe List management of a batch.
267         */
268        private static class EventList extends ArrayList<AvroFlumeEvent> {
269    
270            /**
271             * Generated serial version ID.
272             */
273            private static final long serialVersionUID = -1599817377315957495L;
274    
275            public synchronized List<AvroFlumeEvent> addAndGet(final AvroFlumeEvent event, final int batchSize) {
276                super.add(event);
277                if (this.size() >= batchSize) {
278                    final List<AvroFlumeEvent> events = new ArrayList<AvroFlumeEvent>();
279                    events.addAll(this);
280                    clear();
281                    return events;
282                } else {
283                    return null;
284                }
285            }
286        }
287    
288        /**
289         * Factory data.
290         */
291        private static class FactoryData {
292            private final String name;
293            private final Agent[] agents;
294            private final int batchSize;
295    
296            /**
297             * Constructor.
298             * @param name The name of the Appender.
299             * @param agents The agents.
300             * @param batchSize The number of events to include in a batch.
301             */
302            public FactoryData(final String name, final Agent[] agents, final int batchSize) {
303                this.name = name;
304                this.agents = agents;
305                this.batchSize = batchSize;
306            }
307        }
308    
309        /**
310         * Avro Manager Factory.
311         */
312        private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
313    
314            /**
315             * Create the FlumeAvroManager.
316             * @param name The name of the entity to manage.
317             * @param data The data required to create the entity.
318             * @return The FlumeAvroManager.
319             */
320            public FlumeAvroManager createManager(final String name, final FactoryData data) {
321                try {
322    
323                    return new FlumeAvroManager(name, data.name, data.agents, data.batchSize);
324                } catch (final Exception ex) {
325                    LOGGER.error("Could not create FlumeAvroManager", ex);
326                }
327                return null;
328            }
329        }
330    
331    }