资讯详情

基于Flink 1.13.2问题集锦

问题:

问题:

解决方案:静态缓存private static final ...

package com.nj.snx.app.functions;  import com.nj.snx.common.LruLinkedHashMap; import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.functions.ScalarFunction;  import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.function.Consumer;  /**  * @author :cuicc  * @date : 2022/5/16 9:32  */ public class ContainsIndexFunction  extends ScalarFunction {      //设置静态     private static final LruLinkedHashMap<String,List<String>> quotaIdMaps = new LruLinkedHashMap<>(50);      /**      * @param businessKey 流水号      * @param quotas  逗号隔开 衍生指标需要计算id  PC_CODE-INDEX_ID      * @param indexId 所有配置中的衍生指标id      * @return      */    public boolean eval(String businessKey,String quotas,String indexId){ //       System.out.println("自定义函数需要加工的衍生指标ID】" quotas ",获取的indexId:" indexId);        if(StringUtils.isNotEmpty(quotas)){            List<String> strings = quotaIdMaps.get(businessKey);            System.out.println("map----->" strings ":businessKey:" businessKey ";quotas:" quotas ";indexId:" indexId ":当前线程:" Thread.currentThread().getId());            if(strings != null && strings.contains(indexId)){                System.out.println("命中:" indexId ":businessKey:" businessKey ";quotas:" quotas ":当前线程:" Thread.currentThread().getId());                return true;            }else if(strings == null){                System.out.println("为null:" strings ":businessKey:" businessKey ";quotas:" quotas ":当前线程:" Thread.currentThread().getId());                String[] quotaIds = quotas.split(",");                List<String> quotaIdsList = Arrays.asList(quotaIds);                for (int i = 0; i < quotaIdsList.size(); i  ) {                    quotaIdsList.set(i,quotaIdsList.get(i).split("-")[0]);                }                quotaIdMaps.put(businessKey,quotaIdsList);                if(quotaIdsList.contains(indexId)){                    return true;                }            }        }        return false;    } } 

问题:

解决:即可解决。

问题:

解决方案:一层外套,取TOP-N之后就不会报错了。

解决方案:设置debeziumProperties将以下参数传入方法:

properties.setProperty("bigint.unsigned.handling.mode","long");

properties.setProperty("decimal.handling.mode","double");

解:

bin/kafka-consumer-groups.sh  --bootstrap-server s203:9092 --group ddd --describe

没有相应的消费者。[FLINK-11325] Flink Consumer Kafka Topic Not Found ConsumerID - ASF JIRAhttps://issues.apache.org/jira/browse/FLINK-11325

上面的链接反映了这个问题,实际上是flink没有使用kafka直接控制特性kafka消费,而不是交由kafka去消费,connector自己实现了FlinkKafkaConsumer,且没有按照kafka的feature实现coordinator以及JOIN_GROUOP的逻辑,消费者组在服务器中没有查询。

从kafka服务器找不到,但这些指标可以在Flink Web的Metrics可在指标中查询,

标签: 固态继电器s203zl

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台