• 让中国高铁领跑世界——我国高铁装备业唯一女总工程师梁建英 2019-04-19
  • eHub发布“鲁班”智能营销云 重塑智能时代的商业变革 2019-04-18
  • 拜博口腔医疗集团创始人、董事长黎昌仁获第十二届人民企业社会责任奖年度人物奖 2019-04-18
  • 巴川中学王苗:留守儿童长大了 2019-04-15
  • 真正学进去 积极讲出来 扎实做起来市委常委部门分别召开会议传达学习全国两会精神 2019-04-15
  • 实拍武汉万人“父母相亲会”  现场“摆摊”征婚 2019-04-13
  • 丸子-热门标签-华商生活 2019-04-13
  • 房奴!房奴!亚历山大幸福吗? 2019-04-06
  • 我在吃饭,重点在那?重点在饭,难道我只能吃饭吗?看着就想笑 2019-04-06
  • 神州专车爱心助考 30城上线高考“神”助攻专车 2019-04-04
  • 《朝圣之路》第三季直面邪教争议性问题 2019-04-04
  • 冰箱除异味的七个小窍门 2019-03-31
  • 更别忘了马克思对共产主义设定的另一个条件:劳动不再是谋生的手段。这一点哈儿是怎么也不能理解的,哈儿嘛,夏虫不可语冰嘛。 2019-03-31
  • 俄方:美方毁坏俄领事机构大门驱动装置进入搜查 2019-03-29
  • 女性之声——全国妇联 2019-03-29
  • flume进阶

    广东十一选5一定牛 www.aavbg.com 上一张初识里面谢了一些flume入门的内容,其实在真正工作环境里面这种情况使用的是很少的,大部分情况,我们可能需要从多台设备的日志里面汇总收集数据并存储到HDFS上,以便于后期对数据进行处理,真实的情况可能是这样的,分别根据不同的消息来源进行不同的处理,不同的存储..

     

         

      上面只是一个大致情况,一般情况下,我们会将Flume里面可以做的还有很多,大批量的日志数据我们不能说都不加以处理就直接推送出去,一般的,我们会在数据源头对数据进行过滤(即对source进行属性配置),目前来说采用日志数据过滤的方式有两种,一种是Flume自带的拦截器,另外就是我们自己自定义拦截器,下面就分别采用这两种方式实现以下日志过滤相关工作;

    1.采用Flume自带的过滤器进行过滤,目前小生了解到的Flume自带的拦截器分别有,详见 //flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-interceptors

    Timestamp Interceptor

    Host Interceptor

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    a1.sources.r1.interceptors = i1 i2
    ## 这里是给事件的Header中添加一个Host标识 a1.sources.r1.interceptors.i1.type
    = org.apache.flume.interceptor.HostInterceptor$Builder a1.sources.r1.interceptors.i1.preserveExisting = false a1.sources.r1.interceptors.i1.hostHeader = hostname
    ## 这里是给事件添加一个时间戳 a1.sources.r1.interceptors.i2.type
    = org.apache.flume.interceptor.TimestampInterceptor$Builder a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d a1.sinks.k1.channel = c1

     

    Static Interceptor(用于给Header中添加静态值数据)

    Remove Header Interceptor(用于删除匹配的Header数据)

    UUID Interceptor(用于给被截取的所有事件设置一个通用的唯一标识符)

    Search and Replace Interceptor(用于对数据字符串的查找替换功能)

    a1.sources.avroSrc.interceptors = search-replace
    a1.sources.avroSrc.interceptors.search-replace.type = search_replace
    
    # Remove leading alphanumeric characters in an event body.
    a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+
    a1.sources.avroSrc.interceptors.search-replace.replaceString = 
     此段代码用于去除字母和数字

    Regex Filtering Interceptor(此拦截器通过将事件体解释为文本并根据配置的正则表达式匹配文本来选择性地筛选事件。提供的正则表达式可用于包括事件或排除事件)

    Regex Extractor Interceptor(此拦截器使用指定的正则表达式提取regex匹配组,并将匹配组作为事件的标题追加。它还支持可插入的序列化程序,以便在将匹配组添加为事件头之前对其进行格式化)

       具体配置办法详见 //flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-interceptors

    2.自定义拦截器

    2.1. 编写自定义过滤器 

    2.1.1 新建一个maven项目,引入一下依赖

    <properties>

      <version.flume>1.7.0</version.flume>

    </properties>

    -------------------------
    <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>${version.flume}</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-configuration</artifactId> <version>${version.flume}</version> </dependency>

    2.1.2 新建一个MyInterceptor 继承Interceptor 类

    import com.google.common.base.Charsets;
    import com.google.common.collect.Lists;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.util.List;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    import static org.apache.flume.interceptor.RegexFilteringInterceptor.Constants.DEFAULT_REGEX;
    import static org.apache.flume.interceptor.RegexFilteringInterceptor.Constants.REGEX;
    
    /**
     * Created by yangyibo on 17/1/6.
     */
    public class MyInterceptor  implements Interceptor {
        private final Pattern regex;
    
        private MyInterceptor(Pattern regex) {
            this.regex = regex;
        }
    
        @Override
        public void initialize() {
            // NO-OP...
        }
    
        @Override
        public void close() {
            // NO-OP...
        }
        //    JAVA中用于处理字符串常用的有三个类:
    //
    //    java.lang.String、
    //
    //    java.lang.StringBuffer、
    //
    //    java.lang.StringBuilder,
    //
    //    这三者的共同之处都是 final 类,不允许被继承,这主要是从性能和安全性上考虑的,因为这几个类都是经常被使用着的,且考虑到防止其中的参数被修改影响到其它的应用。
    //
    //    StringBuffer 与 StringBuilder 两个基本上差不多,只是 StringBuffer 是线程安全,可以不需要额外的同步用于多线程中;
    //
    //    StringBuilder 是非同步,运行于多线程中就需要使用着单独同步处理,但是速度就比 StringBuffer 快多了;二者之间的共同点都可以通过append、insert进行字符串的操作。
    //
    //    String 实现了三个接口:Serializable、Comparable<String>、CharSequence,
    //
    //    而 StringBuffer 及 StringBuilder 只实现了两个接口 Serializable、CharSequence,相比之下 String 的实例可以通过 compareTo 方法进行比较,而其它两个就不可以。
        @Override
        public Event intercept(Event event) {
            String body = new String(event.getBody(), Charsets.UTF_8);
    
    
            //匹配日志信息中以 Parsing events: 为开头关键字,以END 为结尾关键字 的日志信息段
            String pattern= "(Parsing events)(.*)(END)";
            // 创建 Pattern 对象
            Pattern r= Pattern.compile(pattern);
            // 现在创建 matcher 对象
          Matcher m= r.matcher(body);
            StringBuffer bodyoutput = new StringBuffer();
            if(m.find())//此处可以用多个正则进行匹配,多条件可以用&& 或者|| 的方式约束连接。
            {
                //多个正则匹配后可以将多个匹配的结果append 到bodyoutput
                bodyoutput=bodyoutput.append(m.group(0));
                event.setBody(bodyoutput.toString().getBytes());
            }else{
                event=null;
            }
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> events) {
            List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
            for (Event event : events) {
                Event interceptedEvent = intercept(event);
                if (interceptedEvent != null) {
                    intercepted.add(interceptedEvent);
                }
            }
            return intercepted;
        }
    
        public static class Builder implements Interceptor.Builder {
            private Pattern regex;
            //使用Builder初始化Interceptor
            @Override
            public Interceptor build() {
                return new MyInterceptor(regex);
            }
    
            @Override
            public void configure(Context context) {
                String regexString = context.getString(REGEX, DEFAULT_REGEX);
                regex = Pattern.compile(regexString);
            }
        }
    }

    2.1.3 将此maven项目打成jar包,将jar 包放到flume的lib 目录下。

    关于jar 包管理请看 flume自定义组件的 jar 包管理


    b. 编写flume配置文件

    #filter
    a1.sources.r1.interceptors=i1
    a1.sources.r1.interceptors.i1.type=regex_filter
    #a1.sources.r1.interceptors.i1.type= com.us.MyInterceptor$Builder
    a1.sources.r1.interceptors.i1.regex=(Parsing events)(.*)(END)
    
    此处略去sink和channel

    至此,自定义拦截器已经操作完成。

    以上内容仅为个人拙见,如有错误,还请指正,不胜感激

     

    posted @ 2019-03-16 19:21 秋风催落叶 阅读(...) 评论(...) 编辑 收藏
  • 让中国高铁领跑世界——我国高铁装备业唯一女总工程师梁建英 2019-04-19
  • eHub发布“鲁班”智能营销云 重塑智能时代的商业变革 2019-04-18
  • 拜博口腔医疗集团创始人、董事长黎昌仁获第十二届人民企业社会责任奖年度人物奖 2019-04-18
  • 巴川中学王苗:留守儿童长大了 2019-04-15
  • 真正学进去 积极讲出来 扎实做起来市委常委部门分别召开会议传达学习全国两会精神 2019-04-15
  • 实拍武汉万人“父母相亲会”  现场“摆摊”征婚 2019-04-13
  • 丸子-热门标签-华商生活 2019-04-13
  • 房奴!房奴!亚历山大幸福吗? 2019-04-06
  • 我在吃饭,重点在那?重点在饭,难道我只能吃饭吗?看着就想笑 2019-04-06
  • 神州专车爱心助考 30城上线高考“神”助攻专车 2019-04-04
  • 《朝圣之路》第三季直面邪教争议性问题 2019-04-04
  • 冰箱除异味的七个小窍门 2019-03-31
  • 更别忘了马克思对共产主义设定的另一个条件:劳动不再是谋生的手段。这一点哈儿是怎么也不能理解的,哈儿嘛,夏虫不可语冰嘛。 2019-03-31
  • 俄方:美方毁坏俄领事机构大门驱动装置进入搜查 2019-03-29
  • 女性之声——全国妇联 2019-03-29