Pulsar更新智能化 一键搞定集群更新与测试

  • 电脑网络维修
  • 2024-11-15

背景

由于我在公司外部担任保养Pulsar,须要时不时的更新Pulsar版本从而和社区坚持分歧。

而每次更新环节都须要做相反的步骤:

命令行工具

以上的流程步骤最好是所有一键成功,咱们只须要人工检测下监控能否反常即可。

于是我便写了一个命令行工具,口头流程如下:

pulsar-upgrade-cli -hok | at 10:33:18A cli app for upgrading PulsarUsage:pulsar-upgrade-cli [command]Available Commands:completionGenerate the autocompletion script for the specified shellhelpHelp about any commandinstallinstall a target versionscalescale statefulSet of the clusterFlags:--burst-limit intclient-side default throttling limit (default 100)--debugenable verbose output-h, --helphelp for pulsar-upgrade-cli--kube-apiserver stringthe address and the port for the Kubernetes API server--kube-as-group stringArraygroup to impersonate for the operation, this flag can be repeated to specify multiple groups.--kube-as-user stringusername to impersonate for the operation

实在经常使用的example如下:

pulsar-upgrade-cli install \--values ./charts/pulsar/values.yaml \--set namespace=pulsar-test \--set initialize=true \--debug \--test-case-schema=http \--test-case-host=127.0.0.1 \--test-case-port=9999 \pulsar-test ./charts/pulsar -n pulsar-test

它的装置命令十分相似于helm,也是间接经常使用 helm 的value.yaml启动装置;只是在装置成功后(期待一切的 Pod 都处于 Running 形态)会再触发 test-case 测试,也就是恳求一个 endpoint。

同时还提供了一个 scale(扩、缩容) 命令,可以用修正集群规模:

# 缩容集群规模为0./pulsar-upgrade-cli scale --replicase 0 -n pulsar-test# 缩容为最小集群./pulsar-upgrade-cli scale --replicase 1 -n pulsar-test# 复原为最满集群./pulsar-upgrade-cli scale --replicase 2 -n pulsar-test

这个需求是由于咱们的Pulsar测试集群部署在了一个servless的kubernetes集群里,它是依照经常使用量不要钱的,所以在我不须要的经常使用的时刻可以经过这个命令将一切的正本数量修正为 0,从而缩小经常使用老本。

当只须要做便捷的性能测试时便回将集群修正为最小集群,将正本数修正为只可以提供服务即可。

而当须要做性能测试时就须要将集群修正为最高性能。

这样可以防止每次都装置新集群,同时也可以有效的缩小测试老本。

成功原理

require (github.com/spf13/cobra v1.6.1github.com/spf13/pflag v1.0.5helm.sh/helm/v3 v3.10.2)

这个命令行工具实质上是参考了 helm 的命令行成功的,一切关键也是依赖了helm和cobra。

上方以最关键的装置命令为例,外围的是以下的步骤:

func (e *installEvent) FinishInstall(cfg *action.Configuration, name string) error {bar.Increment()bar.Finish()clientSet, err := cfg.KubernetesClientSet()if err != nil {return err}ctx := context.Background()ip, err := GetServiceExternalIp(ctx, clientSet, settings.Namespace(), fmt.Sprintf("%s-proxy", name))if err != nil {return err}token, err := GetPulsarProxyToken(ctx, clientSet, settings.Namespace(), fmt.Sprintf("%s-token-proxy-admin", name))if err != nil {return err}// trigger testcaseerr = e.client.Trigger(context.Background(), ip, token)return err}

这里的FinishInstall须要失掉到新装置的 Pulsar 集群的 proxy IP 地址和鉴权所经常使用的token(GetServiceExternalIp()/GetPulsarProxyToken())。

将这两个参数传递给test-case才可以构建出pulsar-client.

这个命令的外围性能就是装置集群和触发测试,以及一些集群的基本运维才干。

测试框架

而关于这里的测试用例也有一些小同伴咨询过,如何对 Pulsar 启动性能测试。

其实 Pulsar 源码中曾经蕴含了简直一切咱们会经常使用到的测试代码,通常上只需新版本的官网镜像曾经推送了那就是跑了一切的单测,品质是可以保障的。

那为什么还须要做性能测试呢?

其实很很便捷,Pulsar这类基础组件官网都有提供基准测试,但咱们想要用于消费环境依然须要自己做压测得出一份属于自己环境下的性能测试报告。

基本目的是要看在自己的业务场景下能否可以满足(包括公司的软配件,不同的业务代码)。

所以这里的性能测试代码有一个很关键的前提就是:须要经常使用实在的业务代码启动测试。

也就是业务在线上经常使用与 Pulsar 关系的代码须要参考性能测试里的代码成功,不然有些疑问就无法在测试环节笼罩到。

成功原理

以上是一个集群的性能测试报告,这里我只要 8 个测试场景(联合实践业务经常使用),思索到未来或许会有新的测试用例,所以在设计这个测试框架时就得思索到裁减性。

AbstractJobDefine job5 =new FailoverConsumerTest(event, "缺点转移消费测试", pulsarClient, 20, admin);CompletableFuture<Void> c5 = CompletableFuture.runAsync(job5::start, EXECUTOR);AbstractJobDefine job6 = new SchemaTest(event,"schema测试",pulsarClient,20,prestoService);CompletableFuture<Void> c6 = CompletableFuture.runAsync(job6::start, EXECUTOR);AbstractJobDefine job7 = new VlogsTest(event,"vlogs test",pulsarClient,20, vlogsUrl);CompletableFuture<Void> c7 = CompletableFuture.runAsync(job7::start, EXECUTOR);CompletableFuture<Void> all = CompletableFuture.allOf(c1, c2, c3, c4, c5, c6, c7);all.whenComplete((___, __) -> {event.finishAll();pulsarClient.closeAsync();admin.close();}).get();

对外提供的 trigger 接口就不贴代码了,重点就是在这里构建测试义务,而后期待他们所有口头终了。

@Datapublic abstract class AbstractJobDefine {private Event event;private String jobName;private PulsarClient pulsarClient;private int timeout;private PulsarAdmin admin;public AbstractJobDefine(Event event, String jobName, PulsarClient pulsarClient, int timeout, PulsarAdmin admin) {this.event = event;this.jobName = jobName;this.pulsarClient = pulsarClient;this.timeout = timeout;this.admin = admin;}public void start() {event.addJob();try {CompletableFuture.runAsync(() -> {StopWatch watch = new StopWatch();try {watch.start(jobName);run(pulsarClient, admin);} catch (Exception e) {event.oneException(this, e);} finally {watch.stop();event.finishOne(jobName, StrUtil.format("cost: {}s", watch.getTotalTimeSeconds()));}}, TestCase.EXECUTOR).get(timeout, TimeUnit.SECONDS);} catch (Exception e) {event.oneException(this, e);}}/** run busy code* @param pulsarClient pulsar client* @param admin pulsar admin client* @throws Exception e*/public abstract void run(PulsarClient pulsarClient, PulsarAdmin admin) throws Exception;}

外围代码就是这个形象的义务定义类,其中的 start 函数用于定义义务口头的模版:

上方来看一个普通用例的成功状况:

就是重写了run()函数,而后在其中成功详细的测试用例,断言测试结果。

这样当咱们须要再参与用例的时刻只须要再新增一个子类成功即可。

同时还须要定义一个事情接口,用于处置一些关键的节点:

public interface Event {/*** 新增一个义务*/void addJob();/** 失掉运转中的义务数量* @return 失掉运转中的义务数量*/TestCaseRuntimeResponse getRuntime();/*** 单个义务口头终了** @param jobName义务称号* @param finishCost 义务成功耗时*/void finishOne(String jobName, String finishCost);/**单个义务口头意外* @param jobDefine 义务* @param e 意外*/void oneException(AbstractJobDefine jobDefine, Exception e);/*** 一切义务口头终了*/void finishAll();}

其中getRuntime接口是用于在 cli 那边查问义务能否口头终了的接口,只要义务口头终了之后才干分开cli。

监控目的

当这些义务运转终了后咱们须要重点检查运行客户端和 Pulsar broker 端能否无心外日志。

同时还须要观察一些关键的监控面板:

蕴含但不限于:

当然还有zookeeper的运转状况也须要监控,限于篇幅就不逐一粘贴了。

以上就是测试整个 Pulsar 集群的流程,当然还有一些须要优化的中央。

比如经常使用命令行还是有些不便,后续或许会切换到网页上就可以操作。

  • 关注微信

本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载联系作者并注明出处:https://duobeib.com/diannaowangluoweixiu/8875.html

猜你喜欢

热门标签

洗手盆如何疏浚梗塞 洗手盆为何梗塞 iPhone提价霸占4G市场等于原价8折 明码箱怎样设置明码锁 苏泊尔电饭锅保修多久 长城画龙G8253YN彩电输入指令画面变暗疑问检修 彩星彩电解除童锁方法大全 三星笔记本培修点上海 液晶显示器花屏培修视频 燃气热水器不热水要素 热水器不上班经常出现3种处置方法 无氟空调跟有氟空调有什么区别 norltz燃气热水器售后电话 大连站和大连北站哪个离周水子机场近 热水器显示屏亮显示温度不加热 铁猫牌保险箱高效开锁技巧 科技助力安保无忧 创维8R80 汽修 a1265和c3182是什么管 为什么电热水器不能即热 标致空调为什么不冷 神舟培修笔记本培修 dell1420内存更新 青岛自来水公司培修热线电话 包头美的洗衣机全国各市售后服务预定热线号码2024年修缮点降级 创维42k08rd更新 空调为什么运转异响 热水器为何会漏水 该如何处置 什么是可以自己处置的 重庆华帝售后电话 波轮洗衣机荡涤价格 鼎新热水器 留意了!不是水平疑问! 马桶产生了这5个现象 方便 极速 邢台空调移机电话上门服务 扬子空调缺点代码e4是什么疑问 宏基4736zG可以装置W11吗 奥克斯空调培修官方 为什么突然空调滴水很多 乐视s40air刷机包 未联络视的提高方向 官网培修 格力空调售后电话 皇明太阳能电话 看尚X55液晶电视进入工厂形式和软件更新方法 燃气热水器缺点代码

热门资讯

关注我们

微信公众号