问题:
问题:
解决方案:静态缓存private static final ...
package com.nj.snx.app.functions; import com.nj.snx.common.LruLinkedHashMap; import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.functions.ScalarFunction; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.function.Consumer; /** * @author :cuicc * @date : 2022/5/16 9:32 */ public class ContainsIndexFunction extends ScalarFunction { //设置静态 private static final LruLinkedHashMap<String,List<String>> quotaIdMaps = new LruLinkedHashMap<>(50); /** * @param businessKey 流水号 * @param quotas 逗号隔开 衍生指标需要计算id PC_CODE-INDEX_ID * @param indexId 所有配置中的衍生指标id * @return */ public boolean eval(String businessKey,String quotas,String indexId){ // System.out.println("自定义函数需要加工的衍生指标ID】" quotas ",获取的indexId:" indexId); if(StringUtils.isNotEmpty(quotas)){ List<String> strings = quotaIdMaps.get(businessKey); System.out.println("map----->" strings ":businessKey:" businessKey ";quotas:" quotas ";indexId:" indexId ":当前线程:" Thread.currentThread().getId()); if(strings != null && strings.contains(indexId)){ System.out.println("命中:" indexId ":businessKey:" businessKey ";quotas:" quotas ":当前线程:" Thread.currentThread().getId()); return true; }else if(strings == null){ System.out.println("为null:" strings ":businessKey:" businessKey ";quotas:" quotas ":当前线程:" Thread.currentThread().getId()); String[] quotaIds = quotas.split(","); List<String> quotaIdsList = Arrays.asList(quotaIds); for (int i = 0; i < quotaIdsList.size(); i ) { quotaIdsList.set(i,quotaIdsList.get(i).split("-")[0]); } quotaIdMaps.put(businessKey,quotaIdsList); if(quotaIdsList.contains(indexId)){ return true; } } } return false; } }
问题:。
解决:即可解决。
问题:
解决方案:一层外套,取TOP-N之后就不会报错了。
解决方案:设置debeziumProperties将以下参数传入方法:
properties.setProperty("bigint.unsigned.handling.mode","long");
properties.setProperty("decimal.handling.mode","double");
解:
bin/kafka-consumer-groups.sh --bootstrap-server s203:9092 --group ddd --describe
上面的链接反映了这个问题,实际上是flink没有使用kafka直接控制特性kafka消费,而不是交由kafka去消费,connector自己实现了FlinkKafkaConsumer,且没有按照kafka的feature实现coordinator以及JOIN_GROUOP的逻辑,消费者组在服务器中没有查询。
从kafka服务器找不到,但这些指标可以在Flink Web的Metrics可在指标中查询,
。