博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flume - 组件对象初始化过程(2)
阅读量:7194 次
发布时间:2019-06-29

本文共 36504 字,大约阅读时间需要 121 分钟。

概述

 虽然在上一篇博文《》我们已经了解了flume相关组件的配置的加载以及启动过程,但却遗漏了组件初始化的过程,也就是说缺少了根据配置生成组件的过程,这篇文章就是为了弥补这个过程。

 希望通过这篇文章我们能够了解,如何根据解析完的配置生成source、channel、sink这三个组件。
 后面会再通过一篇博文针对每个组件会通过举一个例子来说明组件的启动过程,这样组件的初始化和启动就讲解清楚了,当然本篇文章还是着重于讲清楚组件的初始化过程。

配置加载

  • 回顾下配置的加载过程,可以从源码的注释当中其实配置的解析过程主要分成两个步骤,step1过程主要用于解析properties的配置信息,step2针对step1解析的结果做二次解析,用于生成source、channel、sink特有的配置信息。
public FlumeConfiguration(Map
properties) { agentConfigMap = new HashMap<>(); errors = new LinkedList<>(); //step1 负责解析配置文件,生成source、channel、sink的配置信息。 for (Entry
entry : properties.entrySet()) { if (!addRawProperty(entry.getKey(), entry.getValue())) { LOGGER.warn("Configuration property ignored: {} = {}", entry.getKey(), entry.getValue()); } } //step2 根据上一步的配置进行根据source、channel、sink的属性进一步进行解析。 validateConfiguration(); }
  • 对比下flume的AgentConfiguration配置对象,源码中我把step1和step2过程初始化的变量都进行了注释了,这样子我想大家应该就够一目了然了吧。
public static class AgentConfiguration {    //step1当中初始化的变量    private final String agentName;    private String configFilters;    private String sources;    private String sinks;    private String channels;    private String sinkgroups;    //step2当中初始化的变量    private final Map
sourceConfigMap; private final Map
sinkConfigMap; private final Map
channelConfigMap; private final Map
sinkgroupConfigMap; private final Map
configFilterConfigMap; //step1当中初始化的变量 private Map
configFilterContextMap; private Map
sourceContextMap; private Map
sinkContextMap; private Map
channelContextMap; private Map
sinkGroupContextMap; //step2当中初始化的变量 private Set
sinkSet; private Set
configFilterSet; private Set
sourceSet; private Set
channelSet; private Set
sinkgroupSet; private final List
errorList; private List
configFiltersInstances; private Map
configFilterPatternCache;
  • 下面我们针对step2的过程(也就是validateConfiguration)进行下细分,因为里面涉及到后面我们组件初始化用到的变量。agentConfigMap中保存着agent对应的配置信息AgentConfiguration。遍历每个AgentConfiguration进行配置验证,也就是aconf.isValid()部分的逻辑,继续跟进该部分逻辑。
private void validateConfiguration() {    Set
> entries = agentConfigMap.entrySet(); Iterator
> it = entries.iterator(); while (it.hasNext()) { Entry
next = it.next(); String agentName = next.getKey(); AgentConfiguration aconf = next.getValue(); //todo aconf是agent的配置文件,我们对整个配置文件进行校验 if (!aconf.isValid()) { LOGGER.warn("Agent configuration invalid for agent '{}'. It will be removed.", agentName); addError(agentName, AGENT_CONFIGURATION_INVALID, ERROR); it.remove(); } LOGGER.debug("Channels:{}\n", aconf.channels); LOGGER.debug("Sinks {}\n", aconf.sinks); LOGGER.debug("Sources {}\n", aconf.sources); }
  • aconf.isValid()部分的逻辑,我们可以看出来我们初始化了configFilterSet、channelSet、sourceSet、sinkSet、sinkgroupSet。然后这里证明了step2中初始化的变量。接着我们接着跟进validateChannels、validateSources、validateSinks这三个过程,之所以关注着三个过程我想大家都能理解,毕竟flume的核心组件无非就是channel、source、sink。
private boolean isValid() {      LOGGER.debug("Starting validation of configuration for agent: {}", agentName);      if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {        LOGGER.debug("Initial configuration: {}", getPrevalidationConfig());      }      configFilterSet = validateConfigFilterSet();      createConfigFilters();      runFiltersThroughConfigs();      // Make sure that at least one channel is specified      if (channels == null || channels.trim().isEmpty()) {        LOGGER.warn(            "Agent configuration for '{}' does not contain any channels. Marking it as invalid.",            agentName        );        addError(CONFIG_CHANNELS, PROPERTY_VALUE_NULL, ERROR);        return false;      }        //todo 这里用于解析所有channel的名字,\\s+代表空格等分隔符      channelSet = new HashSet<>(Arrays.asList(channels.split("\\s+")));      //todo 核心在于验证里面的channel      channelSet = validateChannels(channelSet);      if (channelSet.isEmpty()) {        LOGGER.warn(            "Agent configuration for '{}' does not contain any valid channels. " +                "Marking it as invalid.",            agentName        );        addError(CONFIG_CHANNELS, PROPERTY_VALUE_NULL, ERROR);        return false;      }      //todo 核心的处理source】channel、sink的逻辑      sourceSet = validateSources(channelSet);      sinkSet = validateSinks(channelSet);      sinkgroupSet = validateGroups(sinkSet);      // If no sources or sinks are present, then this is invalid      if (sourceSet.isEmpty() && sinkSet.isEmpty()) {        LOGGER.warn(            "Agent configuration for '{}' has no sources or sinks. Will be marked invalid.",            agentName        );        addError(CONFIG_SOURCES, PROPERTY_VALUE_NULL, ERROR);        addError(CONFIG_SINKS, PROPERTY_VALUE_NULL, ERROR);        return false;      }      // Now rewrite the sources/sinks/channels      this.configFilters = getSpaceDelimitedList(configFilterSet);      sources = getSpaceDelimitedList(sourceSet);      channels = getSpaceDelimitedList(channelSet);      sinks = getSpaceDelimitedList(sinkSet);      sinkgroups = getSpaceDelimitedList(sinkgroupSet);      if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {        LOGGER.debug("Post validation configuration for {}", agentName);        LOGGER.debug(getPostvalidationConfig());      }      return true;    }

  • 分析channel的校验过程,跟进validateChannels的过程,核心点在于将有配置信息的channel放置到channelConfigMap,把没有配置信息的channel放置到channelContextMap。ComponentConfigurationFactory.create根据channel的type进行创建,channel的type在下面的源码当中,create过程中对于指定类不存在情况我们虽然创建了ChannelConfiguration,但是属于isNotFoundConfigClass,然后会放置channelContextMap当中
private Set
validateChannels(Set
channelSet) { Iterator
iter = channelSet.iterator(); Map
newContextMap = new HashMap<>(); ChannelConfiguration conf = null; //针对每个channel进行分析 while (iter.hasNext()) { String channelName = iter.next(); //todo channelContextMap保存了所有的配置信息 Context channelContext = channelContextMap.get(channelName); // Context exists in map. if (channelContext != null) { //todo 正常情况这里取的type是file,所以后面走的是else分支 ChannelType chType = getKnownChannel(channelContext.getString( BasicConfigurationConstants.CONFIG_TYPE)); boolean configSpecified = false; String config = null; if (chType == null) { // 省略不重要的代码 } else { config = chType.toString().toUpperCase(Locale.ENGLISH); configSpecified = true; } try { // 根据flume的ChannelType创建对应的配置文件ChannelConfiguration并根据channelContext进行初始化。 conf = (ChannelConfiguration) ComponentConfigurationFactory.create( channelName, config, ComponentType.CHANNEL); //根据原来的channelContext重新初始化conf对象。 if (conf != null) { conf.configure(channelContext); } //没有相关配置信息的channel放在newContextMap当中。 if ((configSpecified && conf.isNotFoundConfigClass()) || !configSpecified) { newContextMap.put(channelName, channelContext); } else if (configSpecified) { //有配置信息的channel放在channelConfigMap当中 channelConfigMap.put(channelName, conf); } if (conf != null) { errorList.addAll(conf.getErrors()); } } } } //channelContextMap保存了没有配置信息的channel,channelConfigMap保存有配置信息的channel。 channelContextMap = newContextMap; Set
tempchannelSet = new HashSet
(); tempchannelSet.addAll(channelConfigMap.keySet()); tempchannelSet.addAll(channelContextMap.keySet()); channelSet.retainAll(tempchannelSet); return channelSet; }
  • 所有支持的channel类型,这里就不一一详细讲解了,后面会针对每种channel进行分析。
public enum ChannelType implements ComponentWithClassName {  OTHER(null),  FILE("org.apache.flume.channel.file.FileChannel"),  MEMORY("org.apache.flume.channel.MemoryChannel"),  JDBC("org.apache.flume.channel.jdbc.JdbcChannel"),  SPILLABLEMEMORY("org.apache.flume.channel.SpillableMemoryChannel");

  • 分析source的校验过程,跟进validateSources的过程。核心点在于将有配置信息的source放置到sourceConfigMap,把没有配置信息的source放置到sourceContextMap。ComponentConfigurationFactory.create根据source的type进行创建,source的type在下面的源码当中。create过程中对于指定类不存在情况我们虽然创建了SourceConfiguration,但是属于isNotFoundConfigClass,然后会放置sourceContextMap当中
private Set
validateSources(Set
channelSet) { //Arrays.split() call will throw NPE if the sources string is empty if (sources == null || sources.isEmpty()) { LOGGER.warn("Agent configuration for '{}' has no sources.", agentName); addError(CONFIG_SOURCES, PROPERTY_VALUE_NULL, WARNING); return new HashSet
(); } //todo 空格进行分割的sources Set
sourceSet = new HashSet
(Arrays.asList(sources.split("\\s+"))); Map
newContextMap = new HashMap
(); Iterator
iter = sourceSet.iterator(); SourceConfiguration srcConf = null; //todo 遍历每个sources进行配置的解析 while (iter.hasNext()) { String sourceName = iter.next(); Context srcContext = sourceContextMap.get(sourceName); String config = null; boolean configSpecified = false; if (srcContext != null) { //todo 获取sources的type SourceType srcType = getKnownSource(srcContext.getString( BasicConfigurationConstants.CONFIG_TYPE)); if (srcType == null) { config = srcContext.getString( CONFIG_CONFIG); if (config == null || config.isEmpty()) { config = "OTHER"; } else { configSpecified = true; } } else { config = srcType.toString().toUpperCase(Locale.ENGLISH); configSpecified = true; } //todo 创建新的sources的配置信息 try { // Possible reason the configuration can fail here: // Old component is configured directly using Context srcConf = (SourceConfiguration) ComponentConfigurationFactory.create( sourceName, config, ComponentType.SOURCE); //todo 用旧的配置来初始化新的srcConf配置 if (srcConf != null) { srcConf.configure(srcContext); Set
channels = new HashSet
(); if (srcConf.getChannels() != null) { channels.addAll(srcConf.getChannels()); } channels.retainAll(channelSet); if (channels.isEmpty()) { throw new ConfigurationException( "No Channels configured for " + sourceName); } srcContext.put(CONFIG_CHANNELS, this.getSpaceDelimitedList(channels)); } if ((configSpecified && srcConf.isNotFoundConfigClass()) || !configSpecified) { newContextMap.put(sourceName, srcContext); } else if (configSpecified) { //todo 把最新的配置放置到sourceConfigMap当中 sourceConfigMap.put(sourceName, srcConf); } if (srcConf != null) errorList.addAll(srcConf.getErrors()); } catch (ConfigurationException e) { if (srcConf != null) errorList.addAll(srcConf.getErrors()); iter.remove(); LOGGER.warn( "Could not configure source {} due to: {}", new Object[]{sourceName, e.getMessage(), e} ); } } else { iter.remove(); addError(sourceName, CONFIG_ERROR, ERROR); LOGGER.warn("Configuration empty for: {}.Removed.", sourceName); } } // validateComponent(sourceSet, sourceConfigMap, CLASS_SOURCE, ATTR_TYPE, // ATTR_CHANNELS); sourceContextMap = newContextMap; Set
tempsourceSet = new HashSet
(); tempsourceSet.addAll(sourceContextMap.keySet()); tempsourceSet.addAll(sourceConfigMap.keySet()); sourceSet.retainAll(tempsourceSet); return sourceSet; }
  • flume支持的source类型,可以大概看看,后面抽几个核心的分析一下。
public enum SourceType implements ComponentWithClassName {  OTHER(null),  SEQ("org.apache.flume.source.SequenceGeneratorSource"),  NETCAT("org.apache.flume.source.NetcatSource"),  EXEC("org.apache.flume.source.ExecSource"),  AVRO("org.apache.flume.source.AvroSource"),  SYSLOGTCP("org.apache.flume.source.SyslogTcpSource"),  MULTIPORT_SYSLOGTCP("org.apache.flume.source.MultiportSyslogTCPSource"),  SYSLOGUDP("org.apache.flume.source.SyslogUDPSource"),  SPOOLDIR("org.apache.flume.source.SpoolDirectorySource"),  HTTP("org.apache.flume.source.http.HTTPSource"),  THRIFT("org.apache.flume.source.ThriftSource"),  JMS("org.apache.flume.source.jms.JMSSource"),  TAILDIR("org.apache.flume.source.taildir.TaildirSource"),  NETCATUDP("org.apache.flume.source.NetcatUdpSource")

  • 分析sink的校验过程,跟进validateSinks的过程。核心点在于将有配置信息的sink放置到sinkConfigMap,把没有配置信息的sink放置到sinkContextMap。然后额外多提一点就是ComponentConfigurationFactory.create根据sink的type进行创建,sink的type在下面的源码当中。create过程中对于指定类不存在情况我们虽然创建了SinkConfiguration,但是属于isNotFoundConfigClass,然后会放置sinkContextMap当中
private Set
validateSinks(Set
channelSet) { // Preconditions.checkArgument(channelSet != null && channelSet.size() > // 0); Map
newContextMap = new HashMap
(); Set
sinkSet; SinkConfiguration sinkConf = null; if (sinks == null || sinks.isEmpty()) { LOGGER.warn("Agent configuration for '{}' has no sinks.", agentName); addError(CONFIG_SINKS, PROPERTY_VALUE_NULL, WARNING); return new HashSet
(); } else { sinkSet = new HashSet
(Arrays.asList(sinks.split("\\s+"))); } Iterator
iter = sinkSet.iterator(); while (iter.hasNext()) { //todo 这里在遍历所有sink的名字 String sinkName = iter.next(); Context sinkContext = sinkContextMap.get(sinkName.trim()); if (sinkContext == null) { iter.remove(); LOGGER.warn("no context for sink{}", sinkName); addError(sinkName, CONFIG_ERROR, ERROR); } else { String config = null; boolean configSpecified = false; SinkType sinkType = getKnownSink(sinkContext.getString( BasicConfigurationConstants.CONFIG_TYPE)); if (sinkType == null) { config = sinkContext.getString( CONFIG_CONFIG); if (config == null || config.isEmpty()) { config = "OTHER"; } else { configSpecified = true; } } else { config = sinkType.toString().toUpperCase(Locale.ENGLISH); configSpecified = true; } try { LOGGER.debug("Creating sink: {} using {}", sinkName, config); //todo 创建SinkConfigration对象 sinkConf = (SinkConfiguration) ComponentConfigurationFactory.create( sinkName, config, ComponentType.SINK); if (sinkConf != null) { //todo 初始化sink配置 sinkConf.configure(sinkContext); } if (!channelSet.contains(sinkConf.getChannel())) { throw new ConfigurationException("Channel " + sinkConf.getChannel() + " not in active set."); } if ((configSpecified && sinkConf.isNotFoundConfigClass()) || !configSpecified) { newContextMap.put(sinkName, sinkContext); } else if (configSpecified) { //todo sinkConfigMap保存了有配置的sink的配置 sinkConfigMap.put(sinkName, sinkConf); } if (sinkConf != null) errorList.addAll(sinkConf.getErrors()); } catch (ConfigurationException e) { iter.remove(); if (sinkConf != null) errorList.addAll(sinkConf.getErrors()); LOGGER.warn( "Could not configure sink {} due to: {}", new Object[]{sinkName, e.getMessage(), e} ); } } // Filter out any sinks that have invalid channel } //todo 重置了sinkContextMap对象 sinkContextMap = newContextMap; Set
tempSinkset = new HashSet
(); tempSinkset.addAll(sinkConfigMap.keySet()); tempSinkset.addAll(sinkContextMap.keySet()); sinkSet.retainAll(tempSinkset); return sinkSet; }
  • flume支持的sink类型,可以大概看看,后面抽几个核心的分析一下。
public enum SinkType implements ComponentWithClassName {  OTHER(null),  NULL("org.apache.flume.sink.NullSink"),  LOGGER("org.apache.flume.sink.LoggerSink"),  FILE_ROLL("org.apache.flume.sink.RollingFileSink"),  HDFS("org.apache.flume.sink.hdfs.HDFSEventSink"),  IRC("org.apache.flume.sink.irc.IRCSink"),  AVRO("org.apache.flume.sink.AvroSink"),  THRIFT("org.apache.flume.sink.ThriftSink"),  ELASTICSEARCH("org.apache.flume.sink.elasticsearch.ElasticSearchSink"),  HBASE("org.apache.flume.sink.hbase.HBaseSink"),  ASYNCHBASE("org.apache.flume.sink.hbase.AsyncHBaseSink"),  MORPHLINE_SOLR("org.apache.flume.sink.solr.morphline.MorphlineSolrSink"),  HIVE("org.apache.flume.sink.hive.HiveSink"),  HTTP("org.apache.flume.sink.http.HttpSink");

  • 需要对配置ComponentConfigurationFactory.create进行重点讲解,否则有可能绕不出创建对象的逻辑,这里的type我们传进去完整的class路径类型,所以这里先以完整的类去进行加载类,加载失败走Exception分支判断type类型进行创建。
public class ComponentConfigurationFactory {  @SuppressWarnings("unchecked")  public static ComponentConfiguration create(String name, String type, ComponentType component)      throws ConfigurationException {    Class
confType = null; if (type == null) { throw new ConfigurationException( "Cannot create component without knowing its type!"); } try { //todo type如果是指定类且加载成功就用这个类,如果类不存在或者指定的是类型,那么就走的Exception分支。 confType = (Class
) Class.forName(type); return confType.getConstructor(String.class).newInstance(type); } catch (Exception ignored) { try { //todo 我们正常配置的type=File之类的走的是这个分支 type = type.toUpperCase(Locale.ENGLISH); switch (component) { case SOURCE: return SourceConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH)) .getConfiguration(name); case CONFIG_FILTER: return ConfigFilterConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH)) .getConfiguration(name); case SINK: return SinkConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH)) .getConfiguration(name); case CHANNEL: return ChannelConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH)) .getConfiguration(name); case SINK_PROCESSOR: return SinkProcessorConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH)) .getConfiguration(name); case CHANNELSELECTOR: return ChannelSelectorConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH)) .getConfiguration(name); case SINKGROUP: return new SinkGroupConfiguration(name); default: throw new ConfigurationException( "Cannot create configuration. Unknown Type specified: " + type); } } catch (ConfigurationException e) { throw e; } catch (Exception e) { throw new ConfigurationException("Could not create configuration! " + " Due to " + e.getClass().getSimpleName() + ": " + e.getMessage(), e); } } }}

  • 针对上面提到的Exception分支的getConfiguration方法进行分析,这里也是比较核心的,我们以SourceConfigurationType的getConfiguration为例进行分析,其他的几个逻辑也是类似的。从看到的源码当中我们srcConfigurationName配置的类非常可能存在找不到的情况,那么就可以走Exception分支,然后就创建了NotFoundConfigClass类型的SourceConfiguration对象,其他的channel、sink也存在类似的情况
public SourceConfiguration getConfiguration(String name)        throws ConfigurationException {      if (this == OTHER) {        return new SourceConfiguration(name);      }      Class
clazz = null; SourceConfiguration instance = null; try { if (srcConfigurationName != null) { clazz = (Class
) Class .forName(srcConfigurationName); instance = clazz.getConstructor(String.class).newInstance(name); } else { // Could not find the configuration stub, do basic validation instance = new SourceConfiguration(name); // Let the caller know that this was created because of this exception. instance.setNotFoundConfigClass(); } } catch (ClassNotFoundException e) { //todo 因为上面的类都没有找到,所以应该走的这个分支,创建了SourceConfiguration对象并设置setNotFoundConfigClass // Could not find the configuration stub, do basic validation instance = new SourceConfiguration(name); // Let the caller know that this was created because of this exception. instance.setNotFoundConfigClass(); } catch (Exception e) { throw new ConfigurationException("Error creating configuration", e); } return instance; }

flume对象生成流程

  • 我们通过loadChannels、loadSources、loadSinks等方法生成对象,然后通过addChannel、addSourceRunner、addSourceRunner添加到conf当中,最后根据conf启动所有服务,这里我们着重分析3个对象的load过程。
public MaterializedConfiguration getConfiguration() {    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();    FlumeConfiguration fconfig = getFlumeConfiguration();    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());    if (agentConf != null) {      Map
channelComponentMap = Maps.newHashMap(); Map
sourceRunnerMap = Maps.newHashMap(); Map
sinkRunnerMap = Maps.newHashMap(); try { //todo 加载channel对象 loadChannels(agentConf, channelComponentMap); //todo 加载source对象 loadSources(agentConf, channelComponentMap, sourceRunnerMap); //todo 加载sink对象 loadSinks(agentConf, channelComponentMap, sinkRunnerMap); Set
channelNames = new HashSet
(channelComponentMap.keySet()); for (String channelName : channelNames) { ChannelComponent channelComponent = channelComponentMap.get(channelName); if (channelComponent.components.isEmpty()) { LOGGER.warn(String.format("Channel %s has no components connected" + " and has been removed.", channelName)); channelComponentMap.remove(channelName); Map
nameChannelMap = channelCache.get(channelComponent.channel.getClass()); if (nameChannelMap != null) { nameChannelMap.remove(channelName); } } else { LOGGER.info(String.format("Channel %s connected to %s", channelName, channelComponent.components.toString())); conf.addChannel(channelName, channelComponent.channel); } } for (Map.Entry
entry : sourceRunnerMap.entrySet()) { conf.addSourceRunner(entry.getKey(), entry.getValue()); } for (Map.Entry
entry : sinkRunnerMap.entrySet()) { conf.addSinkRunner(entry.getKey(), entry.getValue()); } } catch (InstantiationException ex) { LOGGER.error("Failed to instantiate component", ex); } finally { channelComponentMap.clear(); sourceRunnerMap.clear(); sinkRunnerMap.clear(); } } else { LOGGER.warn("No configuration found for this host:{}", getAgentName()); } return conf; }

channel对象初始化过程

  • 整个逻辑我们可以看出来主要分为3步,分别是根据channelConfigMap初始化channel,根据channelContextMap初始化channel,移除多余的channel(这部分是为了兼容动态flume配置变更设计的),我们分析一下一个Channel的创建过程。
private void loadChannels(AgentConfiguration agentConf,      Map
channelComponentMap) throws InstantiationException { LOGGER.info("Creating channels"); //todo channelsNotReused记录旧的channel对象 ListMultimap
, String> channelsNotReused = ArrayListMultimap.create(); // assume all channels will not be re-used for (Map.Entry
, Map
> entry : channelCache.entrySet()) { Class
channelKlass = entry.getKey(); Set
channelNames = entry.getValue().keySet(); channelsNotReused.get(channelKlass).addAll(channelNames); } //todo 获取所有channel的名字 Set
channelNames = agentConf.getChannelSet(); //todo channelConfigMap获取配置,获取有配置信息的channel并进行初始化 Map
compMap = agentConf.getChannelConfigMap(); /* * Components which have a ComponentConfiguration object */ for (String chName : channelNames) { //todo 从compMap中找对象,getChannelSet,获取没有配置信息的channel并进行初始化 ComponentConfiguration comp = compMap.get(chName); if (comp != null) { //todo 会把需要重用的channel从channelsNotReused移除表明已经重用了,同时新增新的channel。 Channel channel = getOrCreateChannel(channelsNotReused, comp.getComponentName(), comp.getType()); try { //todo 核心地方在于这里负责向channel加载配置,相当于初始化channel对象 Configurables.configure(channel, comp); //todo channelComponentMap保存着所有的channel channelComponentMap.put(comp.getComponentName(), new ChannelComponent(channel)); LOGGER.info("Created channel " + chName); } catch (Exception e) { String msg = String.format("Channel %s has been removed due to an " + "error during configuration", chName); LOGGER.error(msg, e); } } } //todo 负责把所有的channel执行以下配置,这里应该包括旧的channel for (String chName : channelNames) { //todo 从ChannelContext中找对象,channelContextMap获取配置 Context context = agentConf.getChannelContext().get(chName); if (context != null) { Channel channel = getOrCreateChannel(channelsNotReused, chName, context.getString(BasicConfigurationConstants.CONFIG_TYPE)); try { //todo 核心地方在于这里负责向channel加载配置,相当于初始化channel对象 Configurables.configure(channel, context); channelComponentMap.put(chName, new ChannelComponent(channel)); LOGGER.info("Created channel " + chName); } catch (Exception e) { String msg = String.format("Channel %s has been removed due to an " + "error during configuration", chName); LOGGER.error(msg, e); } } } //todo 移除不需要的配置信息 for (Class
channelKlass : channelsNotReused.keySet()) { Map
channelMap = channelCache.get(channelKlass); if (channelMap != null) { for (String channelName : channelsNotReused.get(channelKlass)) { if (channelMap.remove(channelName) != null) { LOGGER.info("Removed {} of type {}", channelName, channelKlass); } } if (channelMap.isEmpty()) { channelCache.remove(channelKlass); } } } }

  • 单个channel的创建过程,分为两步走包括创建channel 和 配置channel,对应的函数是getOrCreateChannel和configure两者。
  • 通过channelFactory.getClass(type)获取对应的channelClass,然后通过channelFactory.create(name, type)创建对象。
private Channel getOrCreateChannel(      ListMultimap
, String> channelsNotReused, String name, String type) throws FlumeException { Class
channelClass = channelFactory.getClass(type); /* * Channel has requested a new instance on each re-configuration */ if (channelClass.isAnnotationPresent(Disposable.class)) { Channel channel = channelFactory.create(name, type); channel.setName(name); return channel; } //todo channelCache是以channel的class作为key,value为map(key为channel的name,value为channel的实例) Map
channelMap = channelCache.get(channelClass); if (channelMap == null) { channelMap = new HashMap
(); channelCache.put(channelClass, channelMap); } //todo name代表的是channel的名字 Channel channel = channelMap.get(name); if (channel == null) { //todo 创建channel对象 channel = channelFactory.create(name, type); channel.setName(name); channelMap.put(name, channel); } //todo 从channelsNotReused移除旧的channel对象 channelsNotReused.get(channelClass).remove(name); return channel; }
  • channelFactory.getClass主要是从ChannelType中获取type对应的class,然后通过class.newInstance()方法创建对象。
@Override  public Channel create(String name, String type) throws FlumeException {    Preconditions.checkNotNull(name, "name");    Preconditions.checkNotNull(type, "type");    logger.info("Creating instance of channel {} type {}", name, type);    Class
channelClass = getClass(type); try { return channelClass.newInstance(); } catch (Exception ex) { throw new FlumeException("Unable to create channel: " + name + ", type: " + type + ", class: " + channelClass.getName(), ex); } } @SuppressWarnings("unchecked") @Override public Class
getClass(String type) throws FlumeException { String channelClassName = type; ChannelType channelType = ChannelType.OTHER; try { channelType = ChannelType.valueOf(type.toUpperCase(Locale.ENGLISH)); } catch (IllegalArgumentException ex) { logger.debug("Channel type {} is a custom type", type); } if (!channelType.equals(ChannelType.OTHER)) { channelClassName = channelType.getChannelClassName(); } try { return (Class
) Class.forName(channelClassName); } catch (Exception ex) { throw new FlumeException("Unable to load channel type: " + type + ", class: " + channelClassName, ex); } }
  • ChannelType的类型的定义如下
public enum ChannelType implements ComponentWithClassName {  OTHER(null),  FILE("org.apache.flume.channel.file.FileChannel"),  MEMORY("org.apache.flume.channel.MemoryChannel"),  JDBC("org.apache.flume.channel.jdbc.JdbcChannel"),}
  • Configurables.configure的主要配置channel,进一步通过指定的channel的configure方法实现数据的配置,通过传入不同的参数来初始化配置。
---------------------*****step1********--------------------------------------public static boolean configure(Object target, Context context) {    if (target instanceof Configurable) {      ((Configurable) target).configure(context);      return true;    }    return false;  }  public static boolean configure(Object target, ComponentConfiguration conf) {    if (target instanceof ConfigurableComponent) {      ((ConfigurableComponent) target).configure(conf);      return true;    }    return false;  }  ---------------------*****step2********--------------------------------------  @Override  public void configure(Context context) {    provider = JdbcChannelProviderFactory.getProvider(context, getName());    LOG.info("JDBC Channel initialized: " + getName());  }  ---------------------*****step3********--------------------------------------  public static synchronized JdbcChannelProvider getProvider(      Context context, String name) {    if (PROVIDER == null) {      PROVIDER = new JdbcChannelProviderImpl();      PROVIDER.initialize(context);    }    if (!INSTANCES.add(name)) {      throw new JdbcChannelException("Attempt to initialize multiple "           + "channels with same name: " + name);    }    return PROVIDER;  }---------------------*****step4********--------------------------------------public void initialize(Context context) {    LOGGER.debug("Initializing JDBC Channel provider");    initializeSystemProperties(context);    initializeDataSource(context);    initializeSchema(context);    initializeChannelState(context);  }  private void initializeSystemProperties(Context context) {}  private void initializeChannelState(Context context) {}  private void initializeSchema(Context context) {}  private void initializeDataSource(Context context) {}

sources对象初始化过程

  • source的创建过程channel很类似,也经过sourceFactory.create创建source对象和Configurables.configure配置source对象,唯一多的步骤就是source需要和channel进行关联,由于比较相似细节就不继续跟进了。
private void loadSources(AgentConfiguration agentConf,      Map
channelComponentMap, Map
sourceRunnerMap) throws InstantiationException { Set
sourceNames = agentConf.getSourceSet(); Map
compMap = agentConf.getSourceConfigMap(); /* * Components which have a ComponentConfiguration object */ for (String sourceName : sourceNames) { ComponentConfiguration comp = compMap.get(sourceName); if (comp != null) { SourceConfiguration config = (SourceConfiguration) comp; //todo 创建一个source对象 Source source = sourceFactory.create(comp.getComponentName(), comp.getType()); try { //todo 配置sources Configurables.configure(source, config); //todo 通过source的config来获取对应的channel的信息,也就是说连接信息 Set
channelNames = config.getChannels(); List
sourceChannels = new ArrayList
(); for (String chName : channelNames) { ChannelComponent channelComponent = channelComponentMap.get(chName); if (channelComponent != null) { sourceChannels.add(channelComponent.channel); } } //todo source没有连接任何的channel if (sourceChannels.isEmpty()) { String msg = String.format("Source %s is not connected to a " + "channel", sourceName); throw new IllegalStateException(msg); } //todo sources的配置当中包含ChannelSelectorConfiguration ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration(); //todo 创建ChannelSelector对象,默认是复制的selector ChannelSelector selector = ChannelSelectorFactory.create( sourceChannels, selectorConfig); //todo 其中包括了selector以及拦截器对象,并通过config设置channelProcessor ChannelProcessor channelProcessor = new ChannelProcessor(selector); Configurables.configure(channelProcessor, config); //todo 关联channelProcessor到source当中 source.setChannelProcessor(channelProcessor); //todo 内部创建了runner sourceRunnerMap.put(comp.getComponentName(), SourceRunner.forSource(source)); //todo 反关联channel到source的连接 for (Channel channel : sourceChannels) { ChannelComponent channelComponent = Preconditions.checkNotNull(channelComponentMap.get(channel.getName()), String.format("Channel %s", channel.getName())); channelComponent.components.add(sourceName); } } catch (Exception e) { String msg = String.format("Source %s has been removed due to an " + "error during configuration", sourceName); LOGGER.error(msg, e); } } } /* * Components which DO NOT have a ComponentConfiguration object * and use only Context */ Map
sourceContexts = agentConf.getSourceContext(); for (String sourceName : sourceNames) { Context context = sourceContexts.get(sourceName); if (context != null) { Source source = sourceFactory.create(sourceName, context.getString(BasicConfigurationConstants.CONFIG_TYPE)); try { Configurables.configure(source, context); List
sourceChannels = new ArrayList
(); String[] channelNames = context.getString( BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+"); for (String chName : channelNames) { ChannelComponent channelComponent = channelComponentMap.get(chName); if (channelComponent != null) { sourceChannels.add(channelComponent.channel); } } if (sourceChannels.isEmpty()) { String msg = String.format("Source %s is not connected to a " + "channel", sourceName); throw new IllegalStateException(msg); } Map
selectorConfig = context.getSubProperties( BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX); ChannelSelector selector = ChannelSelectorFactory.create( sourceChannels, selectorConfig); ChannelProcessor channelProcessor = new ChannelProcessor(selector); Configurables.configure(channelProcessor, context); source.setChannelProcessor(channelProcessor); sourceRunnerMap.put(sourceName, SourceRunner.forSource(source)); for (Channel channel : sourceChannels) { ChannelComponent channelComponent = Preconditions.checkNotNull(channelComponentMap.get(channel.getName()), String.format("Channel %s", channel.getName())); channelComponent.components.add(sourceName); } } catch (Exception e) { String msg = String.format("Source %s has been removed due to an " + "error during configuration", sourceName); LOGGER.error(msg, e); } } } }

sink对象初始化过程

  • sink的创建过程channel很类似,也经过sinkFactory.create创建sink对象和Configurables.configure配置sink对象,唯一多的步骤就是sink需要和channel进行关联,由于比较相似细节就不继续跟进了。
private void loadSinks(AgentConfiguration agentConf,      Map
channelComponentMap, Map
sinkRunnerMap) throws InstantiationException { //todo 获取sink的集合 Set
sinkNames = agentConf.getSinkSet(); Map
compMap = agentConf.getSinkConfigMap(); Map
sinks = new HashMap
(); /* * Components which have a ComponentConfiguration object */ for (String sinkName : sinkNames) { //todo 根据sink的配置创建sink对象 ComponentConfiguration comp = compMap.get(sinkName); if (comp != null) { SinkConfiguration config = (SinkConfiguration) comp; Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType()); try { //todo 配置sink对象 Configurables.configure(sink, config); //todo 找到sink关联的channel ChannelComponent channelComponent = channelComponentMap.get(config.getChannel()); if (channelComponent == null) { String msg = String.format("Sink %s is not connected to a " + "channel", sinkName); throw new IllegalStateException(msg); } //todo sink对象设置channel对象 sink.setChannel(channelComponent.channel); sinks.put(comp.getComponentName(), sink); channelComponent.components.add(sinkName); } catch (Exception e) { String msg = String.format("Sink %s has been removed due to an " + "error during configuration", sinkName); LOGGER.error(msg, e); } } } /* * Components which DO NOT have a ComponentConfiguration object * and use only Context * 处理没有配置信息的sink对象 */ Map
sinkContexts = agentConf.getSinkContext(); for (String sinkName : sinkNames) { Context context = sinkContexts.get(sinkName); if (context != null) { Sink sink = sinkFactory.create(sinkName, context.getString( BasicConfigurationConstants.CONFIG_TYPE)); try { Configurables.configure(sink, context); ChannelComponent channelComponent = channelComponentMap.get( context.getString(BasicConfigurationConstants.CONFIG_CHANNEL)); if (channelComponent == null) { String msg = String.format("Sink %s is not connected to a " + "channel", sinkName); throw new IllegalStateException(msg); } sink.setChannel(channelComponent.channel); sinks.put(sinkName, sink); channelComponent.components.add(sinkName); } catch (Exception e) { String msg = String.format("Sink %s has been removed due to an " + "error during configuration", sinkName); LOGGER.error(msg, e); } } } loadSinkGroups(agentConf, sinks, sinkRunnerMap); }

转载地址:http://bnnkm.baihongyu.com/

你可能感兴趣的文章
python之协程gevent模块
查看>>
推荐系统实践第四章: 利用用户标签
查看>>
requirejs
查看>>
Nutch介绍(译)
查看>>
css模仿微信弹出菜单
查看>>
vue.js学习笔记(一):什么是mvvm框架,vue.js的核心思想
查看>>
Poj2229--Sumsets(递推)
查看>>
SCI投稿相关问题
查看>>
使用HTML5中的element.dataset操作自定义data-*数据
查看>>
JavaScript的5中基本数据类型
查看>>
自动化运维工具pssh、pdsh、pscp
查看>>
创建线程安全的单例(ARC或 非ARC)
查看>>
分享用于学习C++图像处理的代码示例
查看>>
致青春--论职业规划
查看>>
POJ - 2100 Graveyard Design
查看>>
UVA Live Archive 4490 Help Bubu(状压dp)
查看>>
AzCopy Upload Files
查看>>
Asp.net core (学习笔记 路由和语言 route & language)
查看>>
未解之题(个人用)
查看>>
C语言判断函数
查看>>