当前位置: 首页 > news >正文

全国学校信息查询官网seo策划

全国学校信息查询官网,seo策划,制作网站的模板,网站的功能设计背景 在flink中,我们需要对我们写的map转换函数,process处理函数进行单元测试,测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新,本文就记录几个测试过程中的要点 flink中测试函数 首先我们根据我们要测…

背景

在flink中,我们需要对我们写的map转换函数,process处理函数进行单元测试,测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新,本文就记录几个测试过程中的要点

flink中测试函数

首先我们根据我们要测试的是数据流的类型选择不同的测试套件,如下所示:

  1. OneInputStreamOperatorTestHarness:适用于 DataStreams 数据流
  2. KeyedOneInputStreamOperatorTestHarness:适用于 KeyedStreams 分组后的数据流
  3. TwoInputStreamOperatorTestHarness:适用于两个数据流DataStream的 ConnectedStream
  4. KeyedTwoInputStreamOperatorTestHarness:适用于两个 KeyedStream 的 ConnectedStream

其次,根据是测试map函数还是process函数,我们选择不同的操作符,如果是map函数我们选择StreamFlatMap算子(可同时处理FlatMap和带状态的RichFlatmap函数)还是ProcessFunctionTestHarnesses.forXX算子

map函数测试代码:

@Testpublic void testStateFlatMap() throws Exception {StatefulFlatMap statefulFlatMap = new StatefulFlatMap();// OneInputStreamOperatorTestHarness takes the input and output types as type parametersOneInputStreamOperatorTestHarness<String, String> testHarness =// KeyedOneInputStreamOperatorTestHarness takes three arguments:// Flink operator object, key selector and key typenew KeyedOneInputStreamOperatorTestHarness<String, String, String>(new StreamFlatMap<>(statefulFlatMap),x -> "1", Types.STRING);testHarness.open();// test first recordtestHarness.processElement("world", 10);ValueState<String> previousInput =statefulFlatMap.getRuntimeContext().getState(new ValueStateDescriptor<>("previousInput", Types.STRING));String stateValue = previousInput.value();Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10)),testHarness.extractOutputStreamRecords());Assert.assertEquals("world", stateValue);// test second recordtestHarness.processElement("parallel", 20);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10),new StreamRecord<>("hello parallel world", 20)), testHarness.extractOutputStreamRecords());Assert.assertEquals("parallel", previousInput.value());}public class StatefulFlatMap extends RichFlatMapFunction<String, String> {ValueState<String> previousInput;@Overridepublic void open(Configuration parameters) throws Exception {previousInput = getRuntimeContext().getState(new ValueStateDescriptor<String>("previousInput", Types.STRING));}@Overridepublic void flatMap(String in, Collector<String> collector) throws Exception {String out = "hello " + in;if(previousInput.value() != null){out = out + " " + previousInput.value();}previousInput.update(in);collector.collect(out);}
}

process处理函数代码:

@Testpublic void testProcessElement() throws Exception {MyProcessFunction myProcessFunction = new MyProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x -> "1", Types.STRING);// Function time is initialized to 0testHarness.open();testHarness.processElement("world", 10);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10)),testHarness.extractOutputStreamRecords());}@Testpublic void testOnTimer() throws Exception {MyProcessFunction myProcessFunction = new MyProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x -> "1", Types.STRING);testHarness.open();testHarness.processElement("world", 10);Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// Function time is set to 50testHarness.setProcessingTime(50);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10),new StreamRecord<>("Timer triggered at timestamp 50")),testHarness.extractOutputStreamRecords());}public class MyProcessFunction extends KeyedProcessFunction<String, String, String> {@Overridepublic void processElement(String in, Context context, Collector<String> collector) throws Exception {context.timerService().registerProcessingTimeTimer(50);String out = "hello " + in;collector.collect(out);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {out.collect(String.format("Timer triggered at timestamp %d", timestamp));}}

此外附加官方的map函数的测试代码:

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.streaming.api.operators;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;import org.junit.Assert;
import org.junit.Test;import java.util.concurrent.ConcurrentLinkedQueue;/*** Tests for {@link StreamMap}. These test that:** <ul>*   <li>RichFunction methods are called correctly*   <li>Timestamps of processed elements match the input timestamp*   <li>Watermarks are correctly forwarded* </ul>*/
public class StreamFlatMapTest {private static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {private static final long serialVersionUID = 1L;@Overridepublic void flatMap(Integer value, Collector<Integer> out) throws Exception {if (value % 2 == 0) {out.collect(value);out.collect(value * value);}}}@Testpublic void testFlatMap() throws Exception {StreamFlatMap<Integer, Integer> operator =new StreamFlatMap<Integer, Integer>(new MyFlatMap());OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);long initialTime = 0L;ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();testHarness.open();testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));testHarness.processWatermark(new Watermark(initialTime + 2));testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));testHarness.processElement(new StreamRecord<Integer>(8, initialTime + 8));expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 2));expectedOutput.add(new Watermark(initialTime + 2));expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));expectedOutput.add(new StreamRecord<Integer>(16, initialTime + 4));expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));expectedOutput.add(new StreamRecord<Integer>(36, initialTime + 6));expectedOutput.add(new StreamRecord<Integer>(8, initialTime + 8));expectedOutput.add(new StreamRecord<Integer>(64, initialTime + 8));TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());}@Testpublic void testOpenClose() throws Exception {StreamFlatMap<String, String> operator =new StreamFlatMap<String, String>(new TestOpenCloseFlatMapFunction());OneInputStreamOperatorTestHarness<String, String> testHarness =new OneInputStreamOperatorTestHarness<String, String>(operator);long initialTime = 0L;testHarness.open();testHarness.processElement(new StreamRecord<String>("Hello", initialTime));testHarness.close();Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFlatMapFunction.closeCalled);Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);}// This must only be used in one test, otherwise the static fields will be changed// by several tests concurrentlyprivate static class TestOpenCloseFlatMapFunction extends RichFlatMapFunction<String, String> {private static final long serialVersionUID = 1L;public static boolean openCalled = false;public static boolean closeCalled = false;@Overridepublic void open(OpenContext openContext) throws Exception {super.open(openContext);if (closeCalled) {Assert.fail("Close called before open.");}openCalled = true;}@Overridepublic void close() throws Exception {super.close();if (!openCalled) {Assert.fail("Open was not called before close.");}closeCalled = true;}@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {if (!openCalled) {Assert.fail("Open was not called before run.");}out.collect(value);}}
}

包含同时测试FlatMap和RichFlatMap函数,但是其中没有操作状态,我前面的例子包含了RichFlatMap状态的测试

参考文献:
https://flink.apache.org/2020/02/03/a-guide-for-unit-testing-in-apache-flink/

http://www.tj-hxxt.cn/news/28973.html

相关文章:

  • 网站外网怎么做seo建站系统
  • 适合前端新手做的网站谷歌搜索引擎入口2021
  • 怎样在网站上做推广百度竞价推广是什么工作
  • 万户高端网站建设外贸快车
  • nas 支持做网站无货源电商怎么做
  • 合肥做网站优化公司seo查询网站
  • 西宁网站建设搜q479185700系统优化的例子
  • 烟台网站建设ytwzjs网络营销活动策划
  • 用网站制作自己app软件产品推广运营方案
  • 本地门户网站源码网络营销推广公司网站
  • 网站被k的怎么办热门关键词排名查询
  • 广州公司网站1688精品货源网站入口
  • 上海模板建站哪家好六种常见的网站类型
  • 广州大型网站建设公司腾讯广告
  • 做淘宝图的素材搜索网站企业邮箱如何申请注册
  • 重大军事新闻视频绍兴seo外包
  • 传奇怎么建设自己的网站整站seo技术
  • 网站开发定制多少钱百度seo优化按年收费
  • 网站后台seo优化如何做黄页推广引流
  • 音乐网站数据库怎么做推广方案模板
  • 临沂做网站建设的公司在线seo诊断
  • 门户网站搭建方案企业网站营销的优缺点及案例
  • 本地调试wordpress百度关键词优化推广
  • 上海建网站开发公google chrome网页版
  • 海南网站优化广州今天新闻
  • 动态网站开发环境搭建seo综合查询接口
  • 安徽合肥网站制作公司免费b站推广网站2023
  • 东莞清溪网站制作关于网络营销的方法
  • 那些网站做汽车可靠性超级软文网
  • 做网站的公司济南赛博科技市场俄罗斯搜索引擎yandex