001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.flume.appender;
018
019import java.util.Properties;
020
021import org.apache.flume.Event;
022import org.apache.flume.api.RpcClient;
023import org.apache.flume.api.RpcClientFactory;
024import org.apache.logging.log4j.core.appender.AppenderLoggingException;
025import org.apache.logging.log4j.core.appender.ManagerFactory;
026
027/**
028 * Manager for FlumeAvroAppenders.
029 */
030public class FlumeAvroManager extends AbstractFlumeManager {
031
032    private static final int MAX_RECONNECTS = 3;
033    private static final int MINIMUM_TIMEOUT = 1000;
034
035    private static AvroManagerFactory factory = new AvroManagerFactory();
036
037    private final Agent[] agents;
038
039    private final int batchSize;
040
041    private final int retries;
042
043    private final int connectTimeout;
044
045    private final int requestTimeout;
046
047    private final int current = 0;
048
049    private RpcClient rpcClient = null;
050
051    /**
052     * Constructor
053     * @param name The unique name of this manager.
054     * @param agents An array of Agents.
055     * @param batchSize The number of events to include in a batch.
056     * @param retries The number of times to retry connecting before giving up.
057     * @param connectTimeout The connection timeout in ms.
058     * @param requestTimeout The request timeout in ms.
059     *
060     */
061    protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
062                               final int retries, final int connectTimeout, final int requestTimeout) {
063        super(name);
064        this.agents = agents;
065        this.batchSize = batchSize;
066        this.retries = retries;
067        this.connectTimeout = connectTimeout;
068        this.requestTimeout = requestTimeout;
069        this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
070    }
071
072    /**
073     * Returns a FlumeAvroManager.
074     * @param name The name of the manager.
075     * @param agents The agents to use.
076     * @param batchSize The number of events to include in a batch.
077     * @param retries The number of times to retry connecting before giving up.
078     * @param connectTimeout The connection timeout in ms.
079     * @param requestTimeout The request timeout in ms.
080     * @return A FlumeAvroManager.
081     */
082    public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize,
083                                              final int retries, final int connectTimeout, final int requestTimeout) {
084        if (agents == null || agents.length == 0) {
085            throw new IllegalArgumentException("At least one agent is required");
086        }
087
088        if (batchSize <= 0) {
089            batchSize = 1;
090        }
091
092        final StringBuilder sb = new StringBuilder("FlumeAvro[");
093        boolean first = true;
094        for (final Agent agent : agents) {
095            if (!first) {
096                sb.append(",");
097            }
098            sb.append(agent.getHost()).append(":").append(agent.getPort());
099            first = false;
100        }
101        sb.append("]");
102        return getManager(sb.toString(), factory,
103                new FactoryData(name, agents, batchSize, retries, connectTimeout, requestTimeout));
104    }
105
106    /**
107     * Returns the agents.
108     * @return The agent array.
109     */
110    public Agent[] getAgents() {
111        return agents;
112    }
113
114    /**
115     * Returns the index of the current agent.
116     * @return The index for the current agent.
117     */
118    public int getCurrent() {
119        return current;
120    }
121
122    public int getRetries() {
123        return retries;
124    }
125
126    public int getConnectTimeout() {
127        return connectTimeout;
128    }
129
130    public int getRequestTimeout() {
131        return requestTimeout;
132    }
133
134    public int getBatchSize() {
135        return batchSize;
136    }
137
138    public synchronized void send(final BatchEvent events) {
139        if (rpcClient == null) {
140            rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
141        }
142
143        if (rpcClient != null) {
144            try {
145                LOGGER.trace("Sending batch of {} events", events.getEvents().size());
146                rpcClient.appendBatch(events.getEvents());
147            } catch (final Exception ex) {
148                rpcClient.close();
149                rpcClient = null;
150                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
151                    agents[current].getPort();
152                LOGGER.warn(msg, ex);
153                throw new AppenderLoggingException("No Flume agents are available");
154            }
155        }  else {
156            final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
157                agents[current].getPort();
158            LOGGER.warn(msg);
159            throw new AppenderLoggingException("No Flume agents are available");
160        }
161    }
162
163    @Override
164    public synchronized void send(final Event event)  {
165        if (rpcClient == null) {
166            rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
167        }
168
169        if (rpcClient != null) {
170            try {
171                rpcClient.append(event);
172            } catch (final Exception ex) {
173                rpcClient.close();
174                rpcClient = null;
175                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
176                    agents[current].getPort();
177                LOGGER.warn(msg, ex);
178                throw new AppenderLoggingException("No Flume agents are available");
179            }
180        } else {
181            final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
182                agents[current].getPort();
183            LOGGER.warn(msg);
184            throw new AppenderLoggingException("No Flume agents are available");
185        }
186    }
187
188    /**
189     * There is a very good chance that this will always return the first agent even if it isn't available.
190     * @param agents The list of agents to choose from
191     * @return The FlumeEventAvroServer.
192     */
193
194    private RpcClient connect(final Agent[] agents, int retries, final int connectTimeout, final int requestTimeout) {
195        try {
196            final Properties props = new Properties();
197
198            props.put("client.type", agents.length > 1 ? "default_failover" : "default");
199
200            int count = 1;
201            final StringBuilder sb = new StringBuilder();
202            for (final Agent agent : agents) {
203                if (sb.length() > 0) {
204                    sb.append(" ");
205                }
206                final String hostName = "host" + count++;
207                props.put("hosts." + hostName, agent.getHost() + ":" + agent.getPort());
208                sb.append(hostName);
209            }
210            props.put("hosts", sb.toString());
211            if (batchSize > 0) {
212                props.put("batch-size", Integer.toString(batchSize));
213            }
214            if (retries > 1) {
215                if (retries > MAX_RECONNECTS) {
216                    retries = MAX_RECONNECTS;
217                }
218                props.put("max-attempts", Integer.toString(retries * agents.length));
219            }
220            if (requestTimeout >= MINIMUM_TIMEOUT) {
221                props.put("request-timeout", Integer.toString(requestTimeout));
222            }
223            if (connectTimeout >= MINIMUM_TIMEOUT) {
224                props.put("connect-timeout", Integer.toString(connectTimeout));
225            }
226            return RpcClientFactory.getInstance(props);
227        } catch (final Exception ex) {
228            LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
229            return null;
230        }
231    }
232
233    @Override
234    protected void releaseSub() {
235        if (rpcClient != null) {
236            try {
237                rpcClient.close();
238            } catch (final Exception ex) {
239                LOGGER.error("Attempt to close RPC client failed", ex);
240            }
241        }
242        rpcClient = null;
243    }
244
245    /**
246     * Factory data.
247     */
248    private static class FactoryData {
249        private final String name;
250        private final Agent[] agents;
251        private final int batchSize;
252        private final int retries;
253        private final int conntectTimeout;
254        private final int requestTimeout;
255
256        /**
257         * Constructor.
258         * @param name The name of the Appender.
259         * @param agents The agents.
260         * @param batchSize The number of events to include in a batch.
261         */
262        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
263                           final int connectTimeout, final int requestTimeout) {
264            this.name = name;
265            this.agents = agents;
266            this.batchSize = batchSize;
267            this.retries = retries;
268            this.conntectTimeout = connectTimeout;
269            this.requestTimeout = requestTimeout;
270        }
271    }
272
273    /**
274     * Avro Manager Factory.
275     */
276    private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
277
278        /**
279         * Create the FlumeAvroManager.
280         * @param name The name of the entity to manage.
281         * @param data The data required to create the entity.
282         * @return The FlumeAvroManager.
283         */
284        @Override
285        public FlumeAvroManager createManager(final String name, final FactoryData data) {
286            try {
287
288                return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.retries,
289                    data.conntectTimeout, data.requestTimeout);
290            } catch (final Exception ex) {
291                LOGGER.error("Could not create FlumeAvroManager", ex);
292            }
293            return null;
294        }
295    }
296
297}