Flume Interceptor

就是拦截器,拦截器可以将flume收集到的event进行拦截,并使用对应的拦截器,对event进行简单修改,过滤。同时可以配置多个拦截器实现不同的功能,按照配置的先后顺序进行拦截处理

timestamp Interceptor 给event的头信息中添加时间戳

Static Interceptor 给event的头信息中添加自定义键值

Host Interceptor 给event的头信息中添加主机名或者ip信息

Search and Replace Interceptor 拦截信息进行匹配和替换

Regex File

timestamp interceptor(添加时间戳)

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 44444

==a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp==

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

host interceptor(添加主机信息)

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 44444

==a1.sources.r1.interceptors = i1 ==
==a1.sources.r1.interceptors.i1.type = host==

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Static Interceptor

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 44444

a1.sources.r1.interceptors = i1 i2 i3

a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = name
a1.sources.r1.interceptors.i3.value = zhangsan

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Search and Replace Interceptor

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 44444

==a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = search_replace
a1.sources.r1.interceptors.i1.searchPattern=[a-z]
a1.sources.r1.interceptors.i1.replaceString=*==

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Regex Filering Interceptor(过滤的用正则)

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 44444

==a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex=^jp.*
a1.sources.r1.interceptors.i1.excludeEvents=true==

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Regex Extractor Interceptor(通过正则对event进行捕获)

a1.sources = r1 a1.channels = c1 a1.sinks = k1

a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 44444

==a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex=(^[a-zA-Z]====)\s([0-9]====$)
a1.sources.r1.interceptors.i1.serializers=s1 s2
a1.sources.r1.interceptors.i1.serializers.s1.name=word
a1.sources.r1.interceptors.i1.serializers.s2.name=num==

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1