wordpress网站设置关键词,山东住房和城乡建设部网站,外贸网站推广哪个平台好,12389举报网站建设项目文章目录 定义一个示例高级特性装饰器概述简单案例多种装饰方式 无框架请求概述使用方式 阻塞任务处理器背景概述多种使用方式 rpc 异常统一处理使用方式更详细的异常信息 Armeria 提供 gRPC 客户端多种调用方式同步调用异步调用使用装饰器 负载均衡简单案例Armeria 提供的所有… 文章目录 定义一个示例高级特性装饰器概述简单案例多种装饰方式 无框架请求概述使用方式 阻塞任务处理器背景概述多种使用方式 rpc 异常统一处理使用方式更详细的异常信息 Armeria 提供 gRPC 客户端多种调用方式同步调用异步调用使用装饰器 负载均衡简单案例Armeria 提供的所有负载均衡策略进阶使用 Nacos 集成 Armeria概述实现步骤 文档服务概述实现步骤 定义一个示例 Note本文所讲的所有特性围绕此例展开 1定义一个简单的 proto
syntax proto3;package org.cyk.armeria.grpc.hello;
option java_package org.cyk.armeria.grpc.hello;service HelloService {rpc Hello (HelloReq) returns (HelloResp) {}
}message HelloReq {string name 1;
}message HelloResp {string msg 1;
}2实现服务端
class HelloServiceGrpcFacade: HelloServiceImplBase() {override fun hello(request: Hello.HelloReq,responseObserver: StreamObserverHelloResp) {val resp HelloResp.newBuilder().setMsg(hello ${request.name} ~).build()responseObserver.onNext(resp)responseObserver.onCompleted()}}3服务启动配置
object ArmeriaGrpcBean {fun newServer(port: Int): Server {return Server.builder().http(port) // 1.配置端口号.service(GrpcService.builder().addService(HelloServiceGrpcFacade()) // 2.添加服务示例.build()).build()}}companion object {private lateinit var stub: HelloServiceBlockingStubprivate lateinit var server: ServerJvmStaticBeforeAllfun beforeAll() {server ArmeriaGrpcBean.newServer(9000)server.start()//这里启动不是异步的所以不用 Thread.sleep 等待stub GrpcClients.newClient(http://127.0.0.1:9000/,HelloServiceBlockingStub::class.java,)}}高级特性 装饰器
概述
装饰器主要作用是为了给 服务 或 方法 添加切面逻辑也就是说在不改变核心业务逻辑的情况下添加例如 日志、监控、限流、身份认证 功能最大的好处就是统一处理逻辑复用.
简单案例
例如在调用 HelloServiceGrpcFacade 下的 hello 方法时记录一下日志 那么首先需要先实现一个自定义装饰器
Armeria 默认提供了一些装饰器例如 专门处理日志的 com.linecorp.armeria.server.logging.LoggingService但是为了满足客制化我就根据 LoggingService 源码实现了一个自定义的装饰器
/*** 自定义装饰器* author yikang.chen*/
class CustomDecorator(delegate: HttpService,
) : SimpleDecoratingHttpService(delegate) {companion object {private val log LoggerFactory.getLogger(CustomDecorator::class.java)/*** 这里为了迎合 Armeria 的 Java API只能先这样处理*/fun newDecorator(): Functionin HttpService, out CustomDecorator {return Function { delegate -CustomDecorator(delegate)}}}override fun serve(ctx: ServiceRequestContext, req: HttpRequest): HttpResponse {log.info()log.info(收到客户端 rpc header: ${req.headers()})req.aggregate().thenApply { req -log.info(收到客户端 rpc body: ${req.contentUtf8()})}log.info()return unwrap().serve(ctx, req)}}然后添加到服务配置中 fun newServer(port: Int): Server {return Server.builder().http(port).service(GrpcService.builder().addService(HelloServiceGrpcFacade()).build(),listOf(CustomDecorator.newDecorator()) // ).build()}客户端调用如下 Testfun test1() {val req HelloReq.newBuilder().setName(cyk).build()val resp stub.hello(req)assertTrue { resp.msg.isNotBlank() }}在执行结果中就可以看到 装饰器 的处理信息 23:25:42.779 [armeria-common-worker-nio-3-3] INFO component.CustomDecorator -- 收到客户端 rpc header: [:methodPOST, :authority127.0.0.1:9000, :schemehttp, :path/org.cyk.armeria.grpc.hello.HelloService/Hello, content-typeapplication/grpcproto, tetrailers, grpc-accept-encodinggzip, grpc-timeout15000000u, user-agentarmeria/1.30.1, content-length10]
23:25:42.781 [armeria-common-worker-nio-3-3] INFO component.CustomDecorator -- 收到客户端 rpc body: cyk
23:25:42.781 [armeria-common-worker-nio-3-3] INFO component.CustomDecorator -- 多种装饰方式
1在 GrpcServiceBuilder 中给单个服务指定装饰器 fun newServer(port: Int): Server {return Server.builder().http(port).service(GrpcService.builder().addService(HelloServiceGrpcFacade(), listOf(CustomDecorator.newDecorator())) // .build()).build()}2直接在服务类或者方法上使用 Decorator
Decorator(Custom2::class) // 对该类下的所有方法都管用
class HelloServiceGrpcFacade: HelloServiceImplBase() {Decorator(Custom2::class) // 仅对该方法管用override fun hello(request: Hello.HelloReq,responseObserver: StreamObserverHelloResp) {val resp HelloResp.newBuilder().setMsg(hello ${request.name} ~).build()responseObserver.onNext(resp)responseObserver.onCompleted()}}值得注意的是要使用这种注解的方式那么自定义的装饰器必须要实现 DecoratingHttpServiceFunction 接口如下
class Custom2 : DecoratingHttpServiceFunction {override fun serve(delegate: HttpService, ctx: ServiceRequestContext, req: HttpRequest): HttpResponse {println(另一种装饰器 ...)return delegate.serve(ctx, req)}}Ps那为什么在 Server.builder().service 中没有直接用 Custom2 这种呢因为作者没有提供这种重载… 你需要实现 DecoratingHttpServiceFunction 并继承 SimpleDecoratingHttpService才能达到这两种效果. 无框架请求
概述
GrpcService 支持无框架的请求也就是说你可以使用传统的 protobuf 或 JSON API而无需使用 gRPC 的二进制格式 来调用 gRPC 服务. 这对于将现有 HTTP POST API 迁移到 gRPC 非常有用几乎无缝迁移.
使用方式
使用 Armeria 的 GrpcService可以通过开启 enableUnframedRequests(true) 来支持无框架请求 fun newServer(port: Int): Server {return Server.builder().http(port).service(GrpcService.builder().addService(HelloServiceGrpcFacade()).enableUnframedRequests(true) // 启用无框请求.build(),CustomDecorator.newDecorator(),).build()}客户端请求方式如下
二进制 Protobuf 请求类型HTTP POSTURL: /org.cyk.armeria.grpc.hello.HelloService/HelloContent-Type: application/protobuf请求体使用二进制的 protobuf 格式 JSON 请求类型 HTTP POSTURL/org.cyk.armeria.grpc.hello.HelloService/HelloContent-Type: application/json; charsetutf-8请求体使用 JSON 格式. Ps注意上述 URL 分为三个部分 包名org.cyk.armeria.grpc.hello服务名HelloService方法名Hello 例如这里使用 JSON 请求
curl -X POST http://localhost:9000/org.cyk.armeria.grpc.hello.HelloService/Hello \-H Content-Type: application/json \-d {name: cyk}响应如下
{msg: hello cyk ~
}阻塞任务处理器
背景
Armeria 默认是非阻塞的采用 事件循环模型 来处理请求.
什么是事件循环模型实际上可以类比为 生产者 消费者 模式.
生产者事件循环线程 EventLoop负责监听网络 I/O 事件把收到的客户端请求放入到一个任务队列中此时生产者不会阻塞而是继续处理下一个请求.消费者监听队列中是否有新任务如果有就从队列中取出任务并处理.
当执行的任务都是非阻塞任务时不耗时Aemeria 这种架构可以处理大量的并发请求但是如果来了一些阻塞任务耗时任务就会拖慢整个事件驱动的处理速度.
例如有 4 个事件循环线程在处理假设两个生产者两个消费者此时来了一个 5s 的数据库查询任务那么拿到整个任务的生产者不会有什么事只需要把任务放队列中而拿到整个任务的消费者就会阻塞 5s也就意味这这 5s 里只剩下一个 消费者 在处理 队列中的任务也就相当于整个事件驱动模型的任务处理速度大大下降.
概述
阻塞任务处理器是一个 可以缓存的线程池行为类似于 Executors.newCachedThreadPool()线程会根据任务需求动态创建并在任务完成后回收空闲线程. 目的就是为了将 耗时任务 与 事件循环分离.
例如某个方法被标注为需要使用阻塞处理器那么该方法就会交给阻塞处理器管理而其他方法还是基于事件循环模型来处理任务.
多种使用方式
1注解式将某个类或者某个方法交给阻塞处理器.
Blocking // 让整个类中的方法都在阻塞任务执行器中运行(可以标注类也可以标注方法)
class HelloServiceGrpcFacade: HelloServiceImplBase() {override fun hello(request: Hello.HelloReq,responseObserver: StreamObserverHelloResp) {Thread.sleep(2000) //模拟耗时任务val resp HelloResp.newBuilder().setMsg(hello ${request.name} ~).build()responseObserver.onNext(resp)responseObserver.onCompleted()}}2全局配置服务使得一个 GrpcService 下的所有服务都会在阻塞任务处理器中执行.
object ArmeriaGrpcBean {fun newServer(port: Int): Server {return Server.builder().http(port).service(GrpcService.builder().addService(HelloServiceGrpcFacade()).enableUnframedRequests(true).useBlockingTaskExecutor(true) // 这个 grpc 服务下的所有方法都会使用阻塞执行器.build(),CustomDecorator.newDecorator(),).build()}}3编程式控制某一段逻辑使用阻塞任务处理器. override fun hello(request: Hello.HelloReq,responseObserver: StreamObserverHelloResp) {// 注意: 所有交给阻塞处理器执行的任务都是异步的(线程池)这样使用的前提是异步不干扰后续的业务逻辑ServiceRequestContext.current().blockingTaskExecutor().submit {Thread.sleep(2000) //模拟耗时任务println(耗时任务处理完成)}val resp HelloResp.newBuilder().setMsg(hello ${request.name} ~).build()responseObserver.onNext(resp)responseObserver.onCompleted()}rpc 异常统一处理
使用方式
自定义异常类 HelloException. 自定义异常处理实现 GrpcExceptionHandlerFunction 接口
import com.linecorp.armeria.common.RequestContext
import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction
import io.grpc.Metadata
import io.grpc.Status/*** 自定义异常* author: yikang.chen*/
class HelloException (errorMsg: String
): IllegalStateException(errorMsg)/*** 统一异常处理* author: yikang.chen*/
class GrpcExceptionHandler: GrpcExceptionHandlerFunction {override fun apply(ctx: RequestContext, status: Status, cause: Throwable, metadata: Metadata): Status? {when (cause) {is HelloException - Status.NOT_FOUND.withCause(cause).withDescription(cause.message)is IllegalArgumentException - return Status.INVALID_ARGUMENT.withCause(cause)else - return null}return null}}rpc 方法引发异常如下 override fun hello(request: Hello.HelloReq,responseObserver: StreamObserverHelloResp) {if (1 1 2) {throw HelloException(异常 :( )}val resp HelloResp.newBuilder().setMsg(hello ${request.name} ~).build()responseObserver.onNext(resp)responseObserver.onCompleted()}无框架模式调用后结果如下
更详细的异常信息
如果觉得异常信息不够详细还可以启用详细的异常响应 fun newServer(port: Int): Server {// 启用详细异常响应 System.setProperty(com.linecorp.armeria.verboseResponses, true);return Server.builder().http(port).service(GrpcService.builder().addService(HelloServiceGrpcFacade()).enableUnframedRequests(true).exceptionHandler(GrpcExceptionHandler()).build(),CustomDecorator.newDecorator(),).build()}无框模式调用
客户端调用太长这里只截取关键的一部分
io.grpc.StatusRuntimeException: UNKNOWNat io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:268)at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:249)at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:167)at org.cyk.armeria.grpc.hello.HelloServiceGrpc$HelloServiceBlockingStub.hello(HelloServiceGrpc.java:160)at HelloServiceGrpcFacadeTests.test1(HelloServiceGrpcFacadeTests.kt:32)exception.HelloException: 异常 :(
com.linecorp.armeria.common.grpc.StatusCauseException: exception.HelloException: 异常 :( at service.HelloServiceGrpcFacade.hello(HelloServiceGrpcFacade.kt:16)at org.cyk.armeria.grpc.hello.HelloServiceGrpc$MethodHandlers.invoke(HelloServiceGrpc.java:210)Armeria 提供 gRPC 客户端多种调用方式
同步调用 Testfun test() {val client GrpcClients.newClient(gprotohttp://127.0.0.1:9000/,HelloServiceBlockingStub::class.java)val req HelloReq.newBuilder().setName(cyk).build()val resp client.hello(req)val expect hello cyk ~require(resp.msg expect ) { expect: $expect, actual: ${resp.msg} }}异步调用
不用等待结果使用回调函数来处理服务响应. Testfun testFutures() {val client GrpcClients.newClient(gprotohttp://127.0.0.1:9000/,HelloServiceFutureStub::class.java)val req HelloReq.newBuilder().setName(cyk).build()val futureResp client.hello(req)Futures.addCallback(futureResp, object : FutureCallbackHelloResp {override fun onSuccess(result: HelloResp?) {assertNotNull(result)val expect hello cyk ~require(result.msg expect ) { expect: $expect, actual: ${result.msg} }}override fun onFailure(t: Throwable) {t.printStackTrace()}}, MoreExecutors.directExecutor())// Ps: MoreExecutors.directExecutor() 是 Guava 提供的特殊 Executor 实现它不会为任务创建新的线程也不会在线程池中执行任务。// 相反它会直接在调用任务提交方法的当前线程中执行任务。// 等待异步完成(仅为演示实际可能需要更多的非阻塞方式处理)futureResp.get()}使用装饰器
这里和 服务端类似客户端也可以使用装饰器例如可以使用 Armeria 内置的 日志服务 来记录详细日志. Testfun testDecorator() {val client GrpcClients.builder(gprotohttp://127.0.0.1:9000/).serializationFormat(GrpcSerializationFormats.PROTO) //使用 protobuf 序列化.responseTimeoutMillis(10000) // 响应超时时间为 10 秒.decorator(LoggingClient.newDecorator()) // 添加日志装饰器.build(HelloServiceBlockingStub::class.java)val req HelloReq.newBuilder().setName(cyk).build()val resp client.hello(req)val expect hello cyk ~require(resp.msg expect ) { expect: $expect, actual: ${resp.msg} }}调用后在客户端可以看到详细的请求和响应日志
15:08:55.154 [armeria-common-worker-nio-3-2] DEBUG com.linecorp.armeria.client.logging.LoggingClient -- [creqId44e997a8, chanId6412712d, laddr127.0.0.1:61322, raddr127.0.0.1:9000][http://127.0.0.1:9000/org.cyk.armeria.grpc.hello.HelloService/Hello#POST] Request: {startTime2024-10-01T07:08:54.969Z(1727766534969348), Connection: {total2024-10-01T07:08:55.006348Z[78689µs(78689500ns)], socket2024-10-01T07:08:55.013353Z[70385µs(70385500ns)]}, length10B, duration136ms(136383200ns), schemegprotoh2c, nameHello, headers[:methodPOST, :path/org.cyk.armeria.grpc.hello.HelloService/Hello, :authority127.0.0.1:9000, content-typeapplication/grpcproto, tetrailers, grpc-accept-encodinggzip, grpc-timeout10000000u, content-length10, user-agentarmeria/1.30.1], contentDefaultRpcRequest{serviceTypeGrpcLogUtil, serviceNameorg.cyk.armeria.grpc.hello.HelloService, methodHello, params[name: cyk
]}}
15:08:55.155 [armeria-common-worker-nio-3-2] DEBUG com.linecorp.armeria.client.logging.LoggingClient -- [creqId44e997a8, chanId6412712d, laddr127.0.0.1:61322, raddr127.0.0.1:9000][http://127.0.0.1:9000/org.cyk.armeria.grpc.hello.HelloService/Hello#POST] Response: {startTime2024-10-01T07:08:55.142Z(1727766535142599), length18B, duration2549µs(2549100ns), totalDuration177ms(177998100ns), headers[:status200, content-typeapplication/grpcproto, grpc-encodingidentity, grpc-accept-encodinggzip, serverArmeria/1.30.1, dateTue, 1 Oct 2024 07:08:55 GMT], contentCompletableRpcResponse{msg: hello cyk ~
}, trailers[EOS, grpc-status0]}负载均衡
简单案例
在 Armeria 中EndpointGroup 是管理多个服务实例的工具.
默认负载均衡策略为: EndpointSelectionStrategy.weightedRoundRobin() - 加权轮询每个实例的权重默认是 1000 Testfun test() {// 定义多个服务实例// 默认负载均衡策略为: EndpointSelectionStrategy.weightedRoundRobin() - 加权轮询每个实例的权重默认是 1000val instanceGroup EndpointGroup.of(Endpoint.of(localhost, 9001),Endpoint.of(localhost, 9002),)val clientLB GrpcClients.builder(gprotohttp://group/).endpointRemapper { instanceGroup }.build(HelloServiceBlockingStub::class.java)val req HelloReq.newBuilder().setName(cyk).build()for (i in 1..3) {val resp clientLB.hello(req)val expect hello cyk ~require(resp.msg expect ) { expect: $expect, actual: ${resp.msg} }}}Armeria 提供的所有负载均衡策略 加权轮询策略EndpointSelectionStrategy.weightedRoundRobin() 解释默认采用的策略每个节点权重默认为 1000. 加权轮询策略会尝试在长时间内公平的根据权重分配请求在短时间内可能会有偏差短时间内几乎为轮询特别时请求量小而权重分配特别大时随着请求量的增加会越来越解决预期的分配比例. 当 请求量 等于 所有实例的权重之和 时就可以看到请求的量完全匹配权重的比例.使用场景这个比较玄学… 所有实例权重之和大于请求量时几乎是轮询或者偏差而所有实例权重之和小于请求量时几乎严格按照权重分配的比例. 所以得看你怎么分配了. 普通轮询策略EndpointSelectionStrategy.roundRobin() 解释不考虑权重完全平均分配.使用场景所有节点性能都差不多或者你只是希望流量能均匀分布到各个机器. 逐步提升权重策略EndpointSelectionStrategy.rampingUp() 解释这个策略是为新加入节点设计的并且不会像 加权轮询策略那么玄学无论请求和权重之间比例怎么样都会尽量按照权重分配. 当有新的实例加入集群时系统不会立即让他处理大量请求而是逐渐提高他的负载最后慢慢符合权重比例.使用场景适合动态扩展服务器时使用特别是不想新节点刚启动时被过多请求压垮. 粘性负载均衡策略EndpointSelectionStrategy.sticky() 解释这个策略让特定的请求始终被分配到同一个节点。它会基于某个特定的条件比如用户 ID 或者 cookie生成一个哈希值这个哈希值会决定某个请求固定发送到某个节点。这样某些请求总是发送到同一个服务器便于该服务器进行缓存等优化使用场景适用于需要某类请求固定走同一台服务器的情况比如同一个用户的请求总是被发送到相同的服务器这样服务器可以缓存用户的相关数据提升性能。
进阶使用
这里以 加权负载均衡策略 为例其他策略使用方式也一样 Testfun testCustom() {//使用加权负载均衡策略默认权重为 1000val strategy EndpointSelectionStrategy.weightedRoundRobin() // 这也是默认策略val instanceGroup EndpointGroup.of(strategy,Endpoint.of(localhost, 9001).withWeight(1),Endpoint.of(localhost, 9002).withWeight(9),)var c9001 0var c9002 0// 客户端装饰器: 记录调用次数// Ps这里偷了个懒建议还是专门弄个类然后实现 DecoratingHttpClientFunction 接口val decorator DecoratingHttpClientFunction { delegate, ctx, req -if (ctx.endpoint()!!.port() 9001) {c9001} else {c9002}returnDecoratingHttpClientFunction delegate.execute(ctx, req)}val clientLB GrpcClients.builder(gprotohttp://group/).endpointRemapper { instanceGroup }.decorator(decorator).build(HelloServiceBlockingStub::class.java)val req HelloReq.newBuilder().setName(cyk).build()for (i in 1..100) {val resp clientLB.hello(req)val expect hello cyk ~require(resp.msg expect ) { expect: $expect, actual: ${resp.msg} }}println(9001调用次数: $c9001)println(9002调用次数: $c9002)}日志如下
9001调用次数: 10
9002调用次数: 90Nacos 集成 Armeria
概述
此处 Nacos 作为服务注册和发现中心Armeria 从 Nacos 对应的服务集群中获取健康的实例列表来实现负载均衡最后通过 Armeria 的 gRPC 客户端将请求分发.
实现步骤
1依赖配置 implementation (com.alibaba.nacos:nacos-client:2.3.2)2Armeria 配置 gRPC 服务并注册到 Nacos 中.
object NacosBean {fun newService(): NamingService NacosFactory.createNamingService(Properties().apply {put(serverAddr, 100.64.0.0:8848)put(namespace, 0dc9a7f0-5f97-445a-87e5-9fe6869d6708) //可选默认命名空间为 public (自定义命名空间需要提前在 nacos 客户端上创建此处填写命名空间ID)})}fun main() {val server1 ArmeriaGrpcBean.newServer(9001)val server2 ArmeriaGrpcBean.newServer(9002)server1.start().join()server2.start().join()// 连接 nacos并注册集群val nacos NacosBean.newService()val instance1 Instance().apply {ip 100.94.135.96port 9001clusterName grpc-hello}val instance2 Instance().apply {ip 100.94.135.96port 9002clusterName grpc-hello}nacos.batchRegisterInstance(helloGrpcService, DEFAULT, listOf(instance1, instance2))
}3Armeria 客户端从 Nacos 获取健康实例列表实现负载均衡. Testfun testNacosLB() {// 从 nacos 中获取 helloGrpcService 服务下所有 健康 的服务实例val endpointGroup NacosBean.newService().selectInstances(helloGrpcService, DEFAULT, true) // healthy: true.map { Endpoint.of(it.ip, it.port) }.let { endpoints -EndpointGroup.of(EndpointSelectionStrategy.roundRobin(),endpoints)}val clientLB GrpcClients.builder(gprotohttp://group/).endpointRemapper { endpointGroup }.decorator(DecoratingHttpClientFunction { delegate, ctx, req -println(目标端点: ${ctx.endpoint()!!.port()})returnDecoratingHttpClientFunction delegate.execute(ctx, req)}).build(HelloServiceBlockingStub::class.java)val req HelloReq.newBuilder().setName(cyk).build()for (i in 0..10) {val resp clientLB.hello(req)val expect hello cyk ~require(resp.msg expect ) { expect: $expect, actual: ${resp.msg} }}}文档服务
概述
在 Armeria 中文档服务会自动帮我们生成 API 文档包括 gRPC、HTTP、Thrift 等服务的接口定义文档不仅可以看到接口的定义方式、还可以对接口进行调试非常方便.
实现步骤
只需要在配置 Server 的时候添加文档服务即可
object ArmeriaGrpcBean {fun newServer(port: Int): Server {return Server.builder().http(port).service(GrpcService.builder().addService(HelloServiceGrpcFacade()).enableUnframedRequests(true).exceptionHandler(GrpcExceptionHandler()).build(),).serviceUnder(/docs, DocService()) // 添加文档服务.build()}}启动后访问 ip:port/docs 就可以看到对应的页面 点击对应的服务右上角就可以进行 Debug. Ps防伪签名 yikang.chen | 未经本人允许不得转载.