当前位置: 首页 > news >正文

地产设计网站搜索引擎优化seo怎么做

地产设计网站,搜索引擎优化seo怎么做,wordpress是什么程序,网站横幅广告怎么做前置知识: Flume学习笔记(1)—— Flume入门-CSDN博客 Flume学习笔记(2)—— Flume进阶-CSDN博客 Flume 自定义组件 自定义 Interceptor 需求分析:使用 Flume 采集服务器本地日志,需要按照日志…

前置知识:

Flume学习笔记(1)—— Flume入门-CSDN博客

Flume学习笔记(2)—— Flume进阶-CSDN博客

Flume 自定义组件

自定义 Interceptor

需求分析:使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统

需要使用Flume 拓扑结构中的 Multiplexing 结构,Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值

实现流程:

代码

导入依赖:

<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency>
</dependencies>

自定义拦截器的代码:

package com.why.interceptor;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class TypeInterceptor implements Interceptor {//存放事件的集合private List<Event> addHeaderEvents;@Overridepublic void initialize() {//初始化集合addHeaderEvents = new ArrayList<>();}//单个事件拦截@Overridepublic Event intercept(Event event) {//获取头信息Map<String, String> headers = event.getHeaders();//获取body信息String body = new String(event.getBody());//根据数据中是否包含”why“来分组if(body.contains("why")){headers.put("type","first");}else {headers.put("type","second");}return event;}//批量事件拦截@Overridepublic List<Event> intercept(List<Event> events) {//清空集合addHeaderEvents.clear();//遍历eventsfor(Event event : events){//给每一个事件添加头信息addHeaderEvents.add(intercept(event));}return addHeaderEvents;}@Overridepublic void close() {}//构建生成器public static class TypeBuilder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new TypeInterceptor();}@Overridepublic void configure(Context context) {}}}

将代码打包放入flume安装路径下的lib文件夹中

配置文件

在job文件夹下创建group4目录,添加配置文件;

为 hadoop102 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.why.interceptor.TypeInterceptor$TypeBuilder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.first = c1
a1.sources.r1.selector.mapping.second = c2# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

hadoop103:配置一个 avro source 和一个 logger sink

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

hadoop104:配置一个 avro source 和一个 logger sink

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

执行指令

hadoop102:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume-interceptor-flume.conf

hadoop103:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume1-flume-logger.conf -Dflume.root.logger=INFO,console

hadoop104:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume2-flume-logger.conf -Dflume.root.logger=INFO,console

然后hadoop102通过nc连接44444端口,发送数据:

在hadoop103和104上分别接收到:

自定义 Source

官方提供的文档:Flume 1.11.0 Developer Guide — Apache Flume

给出的示例代码如下:

public class MySource extends AbstractSource implements Configurable, PollableSource {private String myProp;@Overridepublic void configure(Context context) {String myProp = context.getString("myProp", "defaultValue");// Process the myProp value (e.g. validation, convert to another type, ...)// Store myProp for later retrieval by process() methodthis.myProp = myProp;}@Overridepublic void start() {// Initialize the connection to the external client}@Overridepublic void stop () {// Disconnect from external client and do any additional cleanup// (e.g. releasing resources or nulling-out field values) ..}@Overridepublic Status process() throws EventDeliveryException {Status status = null;try {// This try clause includes whatever Channel/Event operations you want to do// Receive new dataEvent e = getSomeData();// Store the Event into this Source's associated Channel(s)getChannelProcessor().processEvent(e);status = Status.READY;} catch (Throwable t) {// Log exception, handle individual exceptions as neededstatus = Status.BACKOFF;// re-throw all Errorsif (t instanceof Error) {throw (Error)t;}} finally {txn.close();}return status;}
}

需要继承AbstractSource,实现Configurable, PollableSource

实战需求分析

使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置

代码

package com.why.source;import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;import java.util.HashMap;
import java.util.concurrent.ConcurrentMap;public class MySource extends AbstractSource implements PollableSource, Configurable {//定义配置文件将来要读取的字段private Long delay;private String field;//获取数据封装成 event 并写入 channel,这个方法将被循环调用@Overridepublic Status process() throws EventDeliveryException {try {//事件头信息HashMap<String,String> headerMap = new HashMap<>();//创建事件SimpleEvent event = new SimpleEvent();//循环封装事件for (int i = 0; i < 5; i++) {//设置头信息event.setHeaders(headerMap);//设置事件内容event.setBody((field + i).getBytes());//将事件写入ChannelgetChannelProcessor().processEvent(event);Thread.sleep(delay);}}catch (InterruptedException e) {throw new RuntimeException(e);}return Status.READY;}//backoff 步长@Overridepublic long getBackOffSleepIncrement() {return 0;}//backoff 最长时间@Overridepublic long getMaxBackOffSleepInterval() {return 0;}//初始化 context(读取配置文件内容)@Overridepublic void configure(Context context) {delay = context.getLong("delay");field = context.getString("field","Hello");}}

打包放到flume安装路径下的lib文件夹中;

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = com.why.source.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = why# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

执行指令

hadoop102上:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group5/mysource.conf -Dflume.root.logger=INFO,console

结果如下:

自定义 Sink

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件

官方文档:Flume 1.11.0 Developer Guide — Apache Flume

接口实例:

public class MySink extends AbstractSink implements Configurable {private String myProp;@Overridepublic void configure(Context context) {String myProp = context.getString("myProp", "defaultValue");// Process the myProp value (e.g. validation)// Store myProp for later retrieval by process() methodthis.myProp = myProp;}@Overridepublic void start() {// Initialize the connection to the external repository (e.g. HDFS) that// this Sink will forward Events to ..}@Overridepublic void stop () {// Disconnect from the external respository and do any// additional cleanup (e.g. releasing resources or nulling-out// field values) ..}@Overridepublic Status process() throws EventDeliveryException {Status status = null;// Start transactionChannel ch = getChannel();Transaction txn = ch.getTransaction();txn.begin();try {// This try clause includes whatever Channel operations you want to doEvent event = ch.take();// Send the Event to the external repository.// storeSomeData(e);txn.commit();status = Status.READY;} catch (Throwable t) {txn.rollback();// Log exception, handle individual exceptions as neededstatus = Status.BACKOFF;// re-throw all Errorsif (t instanceof Error) {throw (Error)t;}}return status;}
}

自定义MySink 需要继承 AbstractSink 类并实现 Configurable 接口

实战需求分析

使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置

代码

package com.why.sink;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable {//创建 Logger 对象private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);//前后缀private String prefix;private String suffix;@Overridepublic Status process() throws EventDeliveryException {//声明返回值状态信息Status status;//获取当前 Sink 绑定的 ChannelChannel ch = getChannel();//获取事务Transaction txn = ch.getTransaction();//声明事件Event event;//开启事务txn.begin();//读取 Channel 中的事件,直到读取到事件结束循环while (true) {event = ch.take();if (event != null) {break;}}try {//处理事件(打印)LOG.info(prefix + new String(event.getBody()) + suffix);//事务提交txn.commit();status = Status.READY;} catch (Exception e) {//遇到异常,事务回滚txn.rollback();status = Status.BACKOFF;} finally {//关闭事务txn.close();}return status;}@Overridepublic void configure(Context context) {prefix = context.getString("prefix", "hello");suffix = context.getString("suffix");}
}

打包放到flume安装路径下的lib文件夹中;

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = com.why.sink.MySink
a1.sinks.k1.prefix = why:
a1.sinks.k1.suffix = :why# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

执行指令

hadoop102上:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group6/mysink.conf -Dflume.root.logger=INFO,console

结果如下:

http://www.tj-hxxt.cn/news/123939.html

相关文章:

  • 镜像网站是如何做的达州seo
  • 网站怎么做三个页面职业培训学校
  • 毕节城乡建设局网站查询网站建设规划书
  • 网站建设分为几类网上推广赚钱方法
  • 新建html网站深圳网络推广建站
  • 网站安全建设申请百度公司招聘岗位
  • 陕西建设监理协会网站免费外链生成器
  • 常用的网站推广方法有哪些培训行业seo整站优化
  • 万网域名控制台上海网站推广优化
  • 做么户网站怎么去前置审批app下载
  • 微信推送在哪个网站做产品推广的渠道
  • 个人网站做影视二级网站怎么做
  • 做公司网站需要什么材料网店代运营公司哪家好
  • 深圳民治网站建设刚刚北京传来重大消息
  • 电子商务网站建设规划实践成果百度网站排名查询
  • 微信网站建设费用计入什么科目北京seo外包平台
  • 北京梦创义网站建设蜜雪冰城推广软文
  • 一级域名网站企业软文营销
  • 池州网站建设公司seo发展前景怎么样啊
  • 网站建设页面设计之后关键字搜索
  • 我想做个软件平台要多少钱seo网络营销招聘
  • 上海的网站建设潍坊网站开发公司
  • 为什么做营销型网站百度百科优化
  • 织梦后台怎么做网站地图seo是免费的吗
  • 淘宝联盟建网站进入百度首页
  • 最专业网站建网店运营是做什么的
  • 小学门户网站建设情况汇报自助建站平台
  • 瑞安哪里有做百度的网站上海seo优化公司kinglink
  • 广州电子商城网站建设优化大师手机版
  • 南京小程序开发网站制灰色词网站seo