本文共 36504 字,大约阅读时间需要 121 分钟。
虽然在上一篇博文《》我们已经了解了flume相关组件的配置的加载以及启动过程,但却遗漏了组件初始化的过程,也就是说缺少了根据配置生成组件的过程,这篇文章就是为了弥补这个过程。
希望通过这篇文章我们能够了解,如何根据解析完的配置生成source、channel、sink这三个组件。 后面会再通过一篇博文针对每个组件会通过举一个例子来说明组件的启动过程,这样组件的初始化和启动就讲解清楚了,当然本篇文章还是着重于讲清楚组件的初始化过程。public FlumeConfiguration(Mapproperties) { agentConfigMap = new HashMap<>(); errors = new LinkedList<>(); //step1 负责解析配置文件,生成source、channel、sink的配置信息。 for (Entry entry : properties.entrySet()) { if (!addRawProperty(entry.getKey(), entry.getValue())) { LOGGER.warn("Configuration property ignored: {} = {}", entry.getKey(), entry.getValue()); } } //step2 根据上一步的配置进行根据source、channel、sink的属性进一步进行解析。 validateConfiguration(); }
public static class AgentConfiguration { //step1当中初始化的变量 private final String agentName; private String configFilters; private String sources; private String sinks; private String channels; private String sinkgroups; //step2当中初始化的变量 private final MapsourceConfigMap; private final Map sinkConfigMap; private final Map channelConfigMap; private final Map sinkgroupConfigMap; private final Map configFilterConfigMap; //step1当中初始化的变量 private Map configFilterContextMap; private Map sourceContextMap; private Map sinkContextMap; private Map channelContextMap; private Map sinkGroupContextMap; //step2当中初始化的变量 private Set sinkSet; private Set configFilterSet; private Set sourceSet; private Set channelSet; private Set sinkgroupSet; private final List errorList; private List configFiltersInstances; private Map configFilterPatternCache;
private void validateConfiguration() { Set> entries = agentConfigMap.entrySet(); Iterator > it = entries.iterator(); while (it.hasNext()) { Entry next = it.next(); String agentName = next.getKey(); AgentConfiguration aconf = next.getValue(); //todo aconf是agent的配置文件,我们对整个配置文件进行校验 if (!aconf.isValid()) { LOGGER.warn("Agent configuration invalid for agent '{}'. It will be removed.", agentName); addError(agentName, AGENT_CONFIGURATION_INVALID, ERROR); it.remove(); } LOGGER.debug("Channels:{}\n", aconf.channels); LOGGER.debug("Sinks {}\n", aconf.sinks); LOGGER.debug("Sources {}\n", aconf.sources); }
private boolean isValid() { LOGGER.debug("Starting validation of configuration for agent: {}", agentName); if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) { LOGGER.debug("Initial configuration: {}", getPrevalidationConfig()); } configFilterSet = validateConfigFilterSet(); createConfigFilters(); runFiltersThroughConfigs(); // Make sure that at least one channel is specified if (channels == null || channels.trim().isEmpty()) { LOGGER.warn( "Agent configuration for '{}' does not contain any channels. Marking it as invalid.", agentName ); addError(CONFIG_CHANNELS, PROPERTY_VALUE_NULL, ERROR); return false; } //todo 这里用于解析所有channel的名字,\\s+代表空格等分隔符 channelSet = new HashSet<>(Arrays.asList(channels.split("\\s+"))); //todo 核心在于验证里面的channel channelSet = validateChannels(channelSet); if (channelSet.isEmpty()) { LOGGER.warn( "Agent configuration for '{}' does not contain any valid channels. " + "Marking it as invalid.", agentName ); addError(CONFIG_CHANNELS, PROPERTY_VALUE_NULL, ERROR); return false; } //todo 核心的处理source】channel、sink的逻辑 sourceSet = validateSources(channelSet); sinkSet = validateSinks(channelSet); sinkgroupSet = validateGroups(sinkSet); // If no sources or sinks are present, then this is invalid if (sourceSet.isEmpty() && sinkSet.isEmpty()) { LOGGER.warn( "Agent configuration for '{}' has no sources or sinks. Will be marked invalid.", agentName ); addError(CONFIG_SOURCES, PROPERTY_VALUE_NULL, ERROR); addError(CONFIG_SINKS, PROPERTY_VALUE_NULL, ERROR); return false; } // Now rewrite the sources/sinks/channels this.configFilters = getSpaceDelimitedList(configFilterSet); sources = getSpaceDelimitedList(sourceSet); channels = getSpaceDelimitedList(channelSet); sinks = getSpaceDelimitedList(sinkSet); sinkgroups = getSpaceDelimitedList(sinkgroupSet); if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) { LOGGER.debug("Post validation configuration for {}", agentName); LOGGER.debug(getPostvalidationConfig()); } return true; }
private SetvalidateChannels(Set channelSet) { Iterator iter = channelSet.iterator(); Map newContextMap = new HashMap<>(); ChannelConfiguration conf = null; //针对每个channel进行分析 while (iter.hasNext()) { String channelName = iter.next(); //todo channelContextMap保存了所有的配置信息 Context channelContext = channelContextMap.get(channelName); // Context exists in map. if (channelContext != null) { //todo 正常情况这里取的type是file,所以后面走的是else分支 ChannelType chType = getKnownChannel(channelContext.getString( BasicConfigurationConstants.CONFIG_TYPE)); boolean configSpecified = false; String config = null; if (chType == null) { // 省略不重要的代码 } else { config = chType.toString().toUpperCase(Locale.ENGLISH); configSpecified = true; } try { // 根据flume的ChannelType创建对应的配置文件ChannelConfiguration并根据channelContext进行初始化。 conf = (ChannelConfiguration) ComponentConfigurationFactory.create( channelName, config, ComponentType.CHANNEL); //根据原来的channelContext重新初始化conf对象。 if (conf != null) { conf.configure(channelContext); } //没有相关配置信息的channel放在newContextMap当中。 if ((configSpecified && conf.isNotFoundConfigClass()) || !configSpecified) { newContextMap.put(channelName, channelContext); } else if (configSpecified) { //有配置信息的channel放在channelConfigMap当中 channelConfigMap.put(channelName, conf); } if (conf != null) { errorList.addAll(conf.getErrors()); } } } } //channelContextMap保存了没有配置信息的channel,channelConfigMap保存有配置信息的channel。 channelContextMap = newContextMap; Set tempchannelSet = new HashSet (); tempchannelSet.addAll(channelConfigMap.keySet()); tempchannelSet.addAll(channelContextMap.keySet()); channelSet.retainAll(tempchannelSet); return channelSet; }
public enum ChannelType implements ComponentWithClassName { OTHER(null), FILE("org.apache.flume.channel.file.FileChannel"), MEMORY("org.apache.flume.channel.MemoryChannel"), JDBC("org.apache.flume.channel.jdbc.JdbcChannel"), SPILLABLEMEMORY("org.apache.flume.channel.SpillableMemoryChannel");
private SetvalidateSources(Set channelSet) { //Arrays.split() call will throw NPE if the sources string is empty if (sources == null || sources.isEmpty()) { LOGGER.warn("Agent configuration for '{}' has no sources.", agentName); addError(CONFIG_SOURCES, PROPERTY_VALUE_NULL, WARNING); return new HashSet (); } //todo 空格进行分割的sources Set sourceSet = new HashSet (Arrays.asList(sources.split("\\s+"))); Map newContextMap = new HashMap (); Iterator iter = sourceSet.iterator(); SourceConfiguration srcConf = null; //todo 遍历每个sources进行配置的解析 while (iter.hasNext()) { String sourceName = iter.next(); Context srcContext = sourceContextMap.get(sourceName); String config = null; boolean configSpecified = false; if (srcContext != null) { //todo 获取sources的type SourceType srcType = getKnownSource(srcContext.getString( BasicConfigurationConstants.CONFIG_TYPE)); if (srcType == null) { config = srcContext.getString( CONFIG_CONFIG); if (config == null || config.isEmpty()) { config = "OTHER"; } else { configSpecified = true; } } else { config = srcType.toString().toUpperCase(Locale.ENGLISH); configSpecified = true; } //todo 创建新的sources的配置信息 try { // Possible reason the configuration can fail here: // Old component is configured directly using Context srcConf = (SourceConfiguration) ComponentConfigurationFactory.create( sourceName, config, ComponentType.SOURCE); //todo 用旧的配置来初始化新的srcConf配置 if (srcConf != null) { srcConf.configure(srcContext); Set channels = new HashSet (); if (srcConf.getChannels() != null) { channels.addAll(srcConf.getChannels()); } channels.retainAll(channelSet); if (channels.isEmpty()) { throw new ConfigurationException( "No Channels configured for " + sourceName); } srcContext.put(CONFIG_CHANNELS, this.getSpaceDelimitedList(channels)); } if ((configSpecified && srcConf.isNotFoundConfigClass()) || !configSpecified) { newContextMap.put(sourceName, srcContext); } else if (configSpecified) { //todo 把最新的配置放置到sourceConfigMap当中 sourceConfigMap.put(sourceName, srcConf); } if (srcConf != null) errorList.addAll(srcConf.getErrors()); } catch (ConfigurationException e) { if (srcConf != null) errorList.addAll(srcConf.getErrors()); iter.remove(); LOGGER.warn( "Could not configure source {} due to: {}", new Object[]{sourceName, e.getMessage(), e} ); } } else { iter.remove(); addError(sourceName, CONFIG_ERROR, ERROR); LOGGER.warn("Configuration empty for: {}.Removed.", sourceName); } } // validateComponent(sourceSet, sourceConfigMap, CLASS_SOURCE, ATTR_TYPE, // ATTR_CHANNELS); sourceContextMap = newContextMap; Set tempsourceSet = new HashSet (); tempsourceSet.addAll(sourceContextMap.keySet()); tempsourceSet.addAll(sourceConfigMap.keySet()); sourceSet.retainAll(tempsourceSet); return sourceSet; }
public enum SourceType implements ComponentWithClassName { OTHER(null), SEQ("org.apache.flume.source.SequenceGeneratorSource"), NETCAT("org.apache.flume.source.NetcatSource"), EXEC("org.apache.flume.source.ExecSource"), AVRO("org.apache.flume.source.AvroSource"), SYSLOGTCP("org.apache.flume.source.SyslogTcpSource"), MULTIPORT_SYSLOGTCP("org.apache.flume.source.MultiportSyslogTCPSource"), SYSLOGUDP("org.apache.flume.source.SyslogUDPSource"), SPOOLDIR("org.apache.flume.source.SpoolDirectorySource"), HTTP("org.apache.flume.source.http.HTTPSource"), THRIFT("org.apache.flume.source.ThriftSource"), JMS("org.apache.flume.source.jms.JMSSource"), TAILDIR("org.apache.flume.source.taildir.TaildirSource"), NETCATUDP("org.apache.flume.source.NetcatUdpSource")
private SetvalidateSinks(Set channelSet) { // Preconditions.checkArgument(channelSet != null && channelSet.size() > // 0); Map newContextMap = new HashMap (); Set sinkSet; SinkConfiguration sinkConf = null; if (sinks == null || sinks.isEmpty()) { LOGGER.warn("Agent configuration for '{}' has no sinks.", agentName); addError(CONFIG_SINKS, PROPERTY_VALUE_NULL, WARNING); return new HashSet (); } else { sinkSet = new HashSet (Arrays.asList(sinks.split("\\s+"))); } Iterator iter = sinkSet.iterator(); while (iter.hasNext()) { //todo 这里在遍历所有sink的名字 String sinkName = iter.next(); Context sinkContext = sinkContextMap.get(sinkName.trim()); if (sinkContext == null) { iter.remove(); LOGGER.warn("no context for sink{}", sinkName); addError(sinkName, CONFIG_ERROR, ERROR); } else { String config = null; boolean configSpecified = false; SinkType sinkType = getKnownSink(sinkContext.getString( BasicConfigurationConstants.CONFIG_TYPE)); if (sinkType == null) { config = sinkContext.getString( CONFIG_CONFIG); if (config == null || config.isEmpty()) { config = "OTHER"; } else { configSpecified = true; } } else { config = sinkType.toString().toUpperCase(Locale.ENGLISH); configSpecified = true; } try { LOGGER.debug("Creating sink: {} using {}", sinkName, config); //todo 创建SinkConfigration对象 sinkConf = (SinkConfiguration) ComponentConfigurationFactory.create( sinkName, config, ComponentType.SINK); if (sinkConf != null) { //todo 初始化sink配置 sinkConf.configure(sinkContext); } if (!channelSet.contains(sinkConf.getChannel())) { throw new ConfigurationException("Channel " + sinkConf.getChannel() + " not in active set."); } if ((configSpecified && sinkConf.isNotFoundConfigClass()) || !configSpecified) { newContextMap.put(sinkName, sinkContext); } else if (configSpecified) { //todo sinkConfigMap保存了有配置的sink的配置 sinkConfigMap.put(sinkName, sinkConf); } if (sinkConf != null) errorList.addAll(sinkConf.getErrors()); } catch (ConfigurationException e) { iter.remove(); if (sinkConf != null) errorList.addAll(sinkConf.getErrors()); LOGGER.warn( "Could not configure sink {} due to: {}", new Object[]{sinkName, e.getMessage(), e} ); } } // Filter out any sinks that have invalid channel } //todo 重置了sinkContextMap对象 sinkContextMap = newContextMap; Set tempSinkset = new HashSet (); tempSinkset.addAll(sinkConfigMap.keySet()); tempSinkset.addAll(sinkContextMap.keySet()); sinkSet.retainAll(tempSinkset); return sinkSet; }
public enum SinkType implements ComponentWithClassName { OTHER(null), NULL("org.apache.flume.sink.NullSink"), LOGGER("org.apache.flume.sink.LoggerSink"), FILE_ROLL("org.apache.flume.sink.RollingFileSink"), HDFS("org.apache.flume.sink.hdfs.HDFSEventSink"), IRC("org.apache.flume.sink.irc.IRCSink"), AVRO("org.apache.flume.sink.AvroSink"), THRIFT("org.apache.flume.sink.ThriftSink"), ELASTICSEARCH("org.apache.flume.sink.elasticsearch.ElasticSearchSink"), HBASE("org.apache.flume.sink.hbase.HBaseSink"), ASYNCHBASE("org.apache.flume.sink.hbase.AsyncHBaseSink"), MORPHLINE_SOLR("org.apache.flume.sink.solr.morphline.MorphlineSolrSink"), HIVE("org.apache.flume.sink.hive.HiveSink"), HTTP("org.apache.flume.sink.http.HttpSink");
public class ComponentConfigurationFactory { @SuppressWarnings("unchecked") public static ComponentConfiguration create(String name, String type, ComponentType component) throws ConfigurationException { Class confType = null; if (type == null) { throw new ConfigurationException( "Cannot create component without knowing its type!"); } try { //todo type如果是指定类且加载成功就用这个类,如果类不存在或者指定的是类型,那么就走的Exception分支。 confType = (Class ) Class.forName(type); return confType.getConstructor(String.class).newInstance(type); } catch (Exception ignored) { try { //todo 我们正常配置的type=File之类的走的是这个分支 type = type.toUpperCase(Locale.ENGLISH); switch (component) { case SOURCE: return SourceConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH)) .getConfiguration(name); case CONFIG_FILTER: return ConfigFilterConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH)) .getConfiguration(name); case SINK: return SinkConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH)) .getConfiguration(name); case CHANNEL: return ChannelConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH)) .getConfiguration(name); case SINK_PROCESSOR: return SinkProcessorConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH)) .getConfiguration(name); case CHANNELSELECTOR: return ChannelSelectorConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH)) .getConfiguration(name); case SINKGROUP: return new SinkGroupConfiguration(name); default: throw new ConfigurationException( "Cannot create configuration. Unknown Type specified: " + type); } } catch (ConfigurationException e) { throw e; } catch (Exception e) { throw new ConfigurationException("Could not create configuration! " + " Due to " + e.getClass().getSimpleName() + ": " + e.getMessage(), e); } } }}
public SourceConfiguration getConfiguration(String name) throws ConfigurationException { if (this == OTHER) { return new SourceConfiguration(name); } Class clazz = null; SourceConfiguration instance = null; try { if (srcConfigurationName != null) { clazz = (Class ) Class .forName(srcConfigurationName); instance = clazz.getConstructor(String.class).newInstance(name); } else { // Could not find the configuration stub, do basic validation instance = new SourceConfiguration(name); // Let the caller know that this was created because of this exception. instance.setNotFoundConfigClass(); } } catch (ClassNotFoundException e) { //todo 因为上面的类都没有找到,所以应该走的这个分支,创建了SourceConfiguration对象并设置setNotFoundConfigClass // Could not find the configuration stub, do basic validation instance = new SourceConfiguration(name); // Let the caller know that this was created because of this exception. instance.setNotFoundConfigClass(); } catch (Exception e) { throw new ConfigurationException("Error creating configuration", e); } return instance; }
public MaterializedConfiguration getConfiguration() { MaterializedConfiguration conf = new SimpleMaterializedConfiguration(); FlumeConfiguration fconfig = getFlumeConfiguration(); AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName()); if (agentConf != null) { MapchannelComponentMap = Maps.newHashMap(); Map sourceRunnerMap = Maps.newHashMap(); Map sinkRunnerMap = Maps.newHashMap(); try { //todo 加载channel对象 loadChannels(agentConf, channelComponentMap); //todo 加载source对象 loadSources(agentConf, channelComponentMap, sourceRunnerMap); //todo 加载sink对象 loadSinks(agentConf, channelComponentMap, sinkRunnerMap); Set channelNames = new HashSet (channelComponentMap.keySet()); for (String channelName : channelNames) { ChannelComponent channelComponent = channelComponentMap.get(channelName); if (channelComponent.components.isEmpty()) { LOGGER.warn(String.format("Channel %s has no components connected" + " and has been removed.", channelName)); channelComponentMap.remove(channelName); Map nameChannelMap = channelCache.get(channelComponent.channel.getClass()); if (nameChannelMap != null) { nameChannelMap.remove(channelName); } } else { LOGGER.info(String.format("Channel %s connected to %s", channelName, channelComponent.components.toString())); conf.addChannel(channelName, channelComponent.channel); } } for (Map.Entry entry : sourceRunnerMap.entrySet()) { conf.addSourceRunner(entry.getKey(), entry.getValue()); } for (Map.Entry entry : sinkRunnerMap.entrySet()) { conf.addSinkRunner(entry.getKey(), entry.getValue()); } } catch (InstantiationException ex) { LOGGER.error("Failed to instantiate component", ex); } finally { channelComponentMap.clear(); sourceRunnerMap.clear(); sinkRunnerMap.clear(); } } else { LOGGER.warn("No configuration found for this host:{}", getAgentName()); } return conf; }
private void loadChannels(AgentConfiguration agentConf, MapchannelComponentMap) throws InstantiationException { LOGGER.info("Creating channels"); //todo channelsNotReused记录旧的channel对象 ListMultimap , String> channelsNotReused = ArrayListMultimap.create(); // assume all channels will not be re-used for (Map.Entry , Map > entry : channelCache.entrySet()) { Class channelKlass = entry.getKey(); Set channelNames = entry.getValue().keySet(); channelsNotReused.get(channelKlass).addAll(channelNames); } //todo 获取所有channel的名字 Set channelNames = agentConf.getChannelSet(); //todo channelConfigMap获取配置,获取有配置信息的channel并进行初始化 Map compMap = agentConf.getChannelConfigMap(); /* * Components which have a ComponentConfiguration object */ for (String chName : channelNames) { //todo 从compMap中找对象,getChannelSet,获取没有配置信息的channel并进行初始化 ComponentConfiguration comp = compMap.get(chName); if (comp != null) { //todo 会把需要重用的channel从channelsNotReused移除表明已经重用了,同时新增新的channel。 Channel channel = getOrCreateChannel(channelsNotReused, comp.getComponentName(), comp.getType()); try { //todo 核心地方在于这里负责向channel加载配置,相当于初始化channel对象 Configurables.configure(channel, comp); //todo channelComponentMap保存着所有的channel channelComponentMap.put(comp.getComponentName(), new ChannelComponent(channel)); LOGGER.info("Created channel " + chName); } catch (Exception e) { String msg = String.format("Channel %s has been removed due to an " + "error during configuration", chName); LOGGER.error(msg, e); } } } //todo 负责把所有的channel执行以下配置,这里应该包括旧的channel for (String chName : channelNames) { //todo 从ChannelContext中找对象,channelContextMap获取配置 Context context = agentConf.getChannelContext().get(chName); if (context != null) { Channel channel = getOrCreateChannel(channelsNotReused, chName, context.getString(BasicConfigurationConstants.CONFIG_TYPE)); try { //todo 核心地方在于这里负责向channel加载配置,相当于初始化channel对象 Configurables.configure(channel, context); channelComponentMap.put(chName, new ChannelComponent(channel)); LOGGER.info("Created channel " + chName); } catch (Exception e) { String msg = String.format("Channel %s has been removed due to an " + "error during configuration", chName); LOGGER.error(msg, e); } } } //todo 移除不需要的配置信息 for (Class channelKlass : channelsNotReused.keySet()) { Map channelMap = channelCache.get(channelKlass); if (channelMap != null) { for (String channelName : channelsNotReused.get(channelKlass)) { if (channelMap.remove(channelName) != null) { LOGGER.info("Removed {} of type {}", channelName, channelKlass); } } if (channelMap.isEmpty()) { channelCache.remove(channelKlass); } } } }
private Channel getOrCreateChannel( ListMultimap, String> channelsNotReused, String name, String type) throws FlumeException { Class channelClass = channelFactory.getClass(type); /* * Channel has requested a new instance on each re-configuration */ if (channelClass.isAnnotationPresent(Disposable.class)) { Channel channel = channelFactory.create(name, type); channel.setName(name); return channel; } //todo channelCache是以channel的class作为key,value为map(key为channel的name,value为channel的实例) Map channelMap = channelCache.get(channelClass); if (channelMap == null) { channelMap = new HashMap (); channelCache.put(channelClass, channelMap); } //todo name代表的是channel的名字 Channel channel = channelMap.get(name); if (channel == null) { //todo 创建channel对象 channel = channelFactory.create(name, type); channel.setName(name); channelMap.put(name, channel); } //todo 从channelsNotReused移除旧的channel对象 channelsNotReused.get(channelClass).remove(name); return channel; }
@Override public Channel create(String name, String type) throws FlumeException { Preconditions.checkNotNull(name, "name"); Preconditions.checkNotNull(type, "type"); logger.info("Creating instance of channel {} type {}", name, type); Class channelClass = getClass(type); try { return channelClass.newInstance(); } catch (Exception ex) { throw new FlumeException("Unable to create channel: " + name + ", type: " + type + ", class: " + channelClass.getName(), ex); } } @SuppressWarnings("unchecked") @Override public Class getClass(String type) throws FlumeException { String channelClassName = type; ChannelType channelType = ChannelType.OTHER; try { channelType = ChannelType.valueOf(type.toUpperCase(Locale.ENGLISH)); } catch (IllegalArgumentException ex) { logger.debug("Channel type {} is a custom type", type); } if (!channelType.equals(ChannelType.OTHER)) { channelClassName = channelType.getChannelClassName(); } try { return (Class ) Class.forName(channelClassName); } catch (Exception ex) { throw new FlumeException("Unable to load channel type: " + type + ", class: " + channelClassName, ex); } }
public enum ChannelType implements ComponentWithClassName { OTHER(null), FILE("org.apache.flume.channel.file.FileChannel"), MEMORY("org.apache.flume.channel.MemoryChannel"), JDBC("org.apache.flume.channel.jdbc.JdbcChannel"),}
---------------------*****step1********--------------------------------------public static boolean configure(Object target, Context context) { if (target instanceof Configurable) { ((Configurable) target).configure(context); return true; } return false; } public static boolean configure(Object target, ComponentConfiguration conf) { if (target instanceof ConfigurableComponent) { ((ConfigurableComponent) target).configure(conf); return true; } return false; } ---------------------*****step2********-------------------------------------- @Override public void configure(Context context) { provider = JdbcChannelProviderFactory.getProvider(context, getName()); LOG.info("JDBC Channel initialized: " + getName()); } ---------------------*****step3********-------------------------------------- public static synchronized JdbcChannelProvider getProvider( Context context, String name) { if (PROVIDER == null) { PROVIDER = new JdbcChannelProviderImpl(); PROVIDER.initialize(context); } if (!INSTANCES.add(name)) { throw new JdbcChannelException("Attempt to initialize multiple " + "channels with same name: " + name); } return PROVIDER; }---------------------*****step4********--------------------------------------public void initialize(Context context) { LOGGER.debug("Initializing JDBC Channel provider"); initializeSystemProperties(context); initializeDataSource(context); initializeSchema(context); initializeChannelState(context); } private void initializeSystemProperties(Context context) {} private void initializeChannelState(Context context) {} private void initializeSchema(Context context) {} private void initializeDataSource(Context context) {}
private void loadSources(AgentConfiguration agentConf, MapchannelComponentMap, Map sourceRunnerMap) throws InstantiationException { Set sourceNames = agentConf.getSourceSet(); Map compMap = agentConf.getSourceConfigMap(); /* * Components which have a ComponentConfiguration object */ for (String sourceName : sourceNames) { ComponentConfiguration comp = compMap.get(sourceName); if (comp != null) { SourceConfiguration config = (SourceConfiguration) comp; //todo 创建一个source对象 Source source = sourceFactory.create(comp.getComponentName(), comp.getType()); try { //todo 配置sources Configurables.configure(source, config); //todo 通过source的config来获取对应的channel的信息,也就是说连接信息 Set channelNames = config.getChannels(); List sourceChannels = new ArrayList (); for (String chName : channelNames) { ChannelComponent channelComponent = channelComponentMap.get(chName); if (channelComponent != null) { sourceChannels.add(channelComponent.channel); } } //todo source没有连接任何的channel if (sourceChannels.isEmpty()) { String msg = String.format("Source %s is not connected to a " + "channel", sourceName); throw new IllegalStateException(msg); } //todo sources的配置当中包含ChannelSelectorConfiguration ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration(); //todo 创建ChannelSelector对象,默认是复制的selector ChannelSelector selector = ChannelSelectorFactory.create( sourceChannels, selectorConfig); //todo 其中包括了selector以及拦截器对象,并通过config设置channelProcessor ChannelProcessor channelProcessor = new ChannelProcessor(selector); Configurables.configure(channelProcessor, config); //todo 关联channelProcessor到source当中 source.setChannelProcessor(channelProcessor); //todo 内部创建了runner sourceRunnerMap.put(comp.getComponentName(), SourceRunner.forSource(source)); //todo 反关联channel到source的连接 for (Channel channel : sourceChannels) { ChannelComponent channelComponent = Preconditions.checkNotNull(channelComponentMap.get(channel.getName()), String.format("Channel %s", channel.getName())); channelComponent.components.add(sourceName); } } catch (Exception e) { String msg = String.format("Source %s has been removed due to an " + "error during configuration", sourceName); LOGGER.error(msg, e); } } } /* * Components which DO NOT have a ComponentConfiguration object * and use only Context */ Map sourceContexts = agentConf.getSourceContext(); for (String sourceName : sourceNames) { Context context = sourceContexts.get(sourceName); if (context != null) { Source source = sourceFactory.create(sourceName, context.getString(BasicConfigurationConstants.CONFIG_TYPE)); try { Configurables.configure(source, context); List sourceChannels = new ArrayList (); String[] channelNames = context.getString( BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+"); for (String chName : channelNames) { ChannelComponent channelComponent = channelComponentMap.get(chName); if (channelComponent != null) { sourceChannels.add(channelComponent.channel); } } if (sourceChannels.isEmpty()) { String msg = String.format("Source %s is not connected to a " + "channel", sourceName); throw new IllegalStateException(msg); } Map selectorConfig = context.getSubProperties( BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX); ChannelSelector selector = ChannelSelectorFactory.create( sourceChannels, selectorConfig); ChannelProcessor channelProcessor = new ChannelProcessor(selector); Configurables.configure(channelProcessor, context); source.setChannelProcessor(channelProcessor); sourceRunnerMap.put(sourceName, SourceRunner.forSource(source)); for (Channel channel : sourceChannels) { ChannelComponent channelComponent = Preconditions.checkNotNull(channelComponentMap.get(channel.getName()), String.format("Channel %s", channel.getName())); channelComponent.components.add(sourceName); } } catch (Exception e) { String msg = String.format("Source %s has been removed due to an " + "error during configuration", sourceName); LOGGER.error(msg, e); } } } }
private void loadSinks(AgentConfiguration agentConf, MapchannelComponentMap, Map sinkRunnerMap) throws InstantiationException { //todo 获取sink的集合 Set sinkNames = agentConf.getSinkSet(); Map compMap = agentConf.getSinkConfigMap(); Map sinks = new HashMap (); /* * Components which have a ComponentConfiguration object */ for (String sinkName : sinkNames) { //todo 根据sink的配置创建sink对象 ComponentConfiguration comp = compMap.get(sinkName); if (comp != null) { SinkConfiguration config = (SinkConfiguration) comp; Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType()); try { //todo 配置sink对象 Configurables.configure(sink, config); //todo 找到sink关联的channel ChannelComponent channelComponent = channelComponentMap.get(config.getChannel()); if (channelComponent == null) { String msg = String.format("Sink %s is not connected to a " + "channel", sinkName); throw new IllegalStateException(msg); } //todo sink对象设置channel对象 sink.setChannel(channelComponent.channel); sinks.put(comp.getComponentName(), sink); channelComponent.components.add(sinkName); } catch (Exception e) { String msg = String.format("Sink %s has been removed due to an " + "error during configuration", sinkName); LOGGER.error(msg, e); } } } /* * Components which DO NOT have a ComponentConfiguration object * and use only Context * 处理没有配置信息的sink对象 */ Map sinkContexts = agentConf.getSinkContext(); for (String sinkName : sinkNames) { Context context = sinkContexts.get(sinkName); if (context != null) { Sink sink = sinkFactory.create(sinkName, context.getString( BasicConfigurationConstants.CONFIG_TYPE)); try { Configurables.configure(sink, context); ChannelComponent channelComponent = channelComponentMap.get( context.getString(BasicConfigurationConstants.CONFIG_CHANNEL)); if (channelComponent == null) { String msg = String.format("Sink %s is not connected to a " + "channel", sinkName); throw new IllegalStateException(msg); } sink.setChannel(channelComponent.channel); sinks.put(sinkName, sink); channelComponent.components.add(sinkName); } catch (Exception e) { String msg = String.format("Sink %s has been removed due to an " + "error during configuration", sinkName); LOGGER.error(msg, e); } } } loadSinkGroups(agentConf, sinks, sinkRunnerMap); }
转载地址:http://bnnkm.baihongyu.com/