当前位置:首页 > 行业动态 > 正文

flink sql数据写入kafka,会出现key为before、after及op,怎么把这些去掉?

要去掉Flink SQL数据写入Kafka时的key为before、after及op,可以通过以下步骤进行配置:

1、在Flink SQL中创建表时,指定ROW FORMAT SERDEKafka,并设置SERDE_PROPERTIES参数。

2、在SERDE_PROPERTIES中,设置"key.deserializer""value.deserializer"为自定义的Deserializer类。

3、实现自定义的Deserializer类,重写deserialize方法,对输入的Key和Value进行处理,去掉不需要的部分。

以下是具体的操作步骤:

1、创建表时指定ROW FORMAT SERDEKafka,并设置SERDE_PROPERTIES参数:

CREATE TABLE kafka_table (
  ...
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'format' = 'json',
  'json.failonmissingfield' = 'false',
  'json.ignoreparseerrors' = 'true',
  'json.nullkey' = '\N',
  'json.valueschema' = '{"type":"struct","fields":[{"name":"before","type":"string"},{"name":"after","type":"string"},{"name":"op","type":"string"}]}',
  'serde' = 'com.example.CustomKafkaDeserializer',
  'serde.properties.schema.registry.url' = 'http://localhost:8081'
);

2、实现自定义的Deserializer类:

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.*;
public class CustomKafkaDeserializer implements DeserializationSchema<Row> {
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final String beforeKey = "before";
    private final String afterKey = "after";
    private final String opKey = "op";
    private final TypeInformation<Row> rowTypeInfo;
    private final Map<String, Integer> fieldIndexMap;
    private final List<String> fieldsToIgnore;
    private final Set<String> keysToRemove;
    private final Set<String> keysToKeep;
    private final boolean ignoreParseErrors;
    private final boolean failOnMissingField;
    private final String nullKey;
    private final String valueSchema;
    private final String schemaRegistryUrl;
    private final String topic;
    private final String groupId;
    private final String bootstrapServers;
    private final String keyDeserializerClassName;
    private final String valueDeserializerClassName;
    private final Class<? extends Deserializer> keyDeserializerClass;
    private final Class<? extends Deserializer> valueDeserializerClass;
    private final Properties serdeProperties;
    private final KafkaDeserializationSchema kafkaDeserializationSchema;
    private final String beforeValueDefault;
    private final String afterValueDefault;
    private final String opValueDefault;
    private final Row defaultValue;
    private final boolean isKeyNullable;
    private final boolean isValueNullable;
    private final boolean isKeyRequired;
    private final boolean isValueRequired;
    private final boolean isKeyIgnored;
    private final boolean isValueIgnored;
    private final boolean isKeyUsedForJoining;
    private final boolean isValueUsedForJoining;
    private final boolean isKeyUsedForPartitioning;
    private final boolean isValueUsedForPartitioning;
    private final boolean isKeyUsedForOrdering;
    private final boolean isValueUsedForOrdering;
    private final boolean isKeyReversed;
    private final boolean isValueReversed;
    private final boolean isKeyCaseInsensitive;
    private final boolean isValueCaseInsensitive;
    private final int[] keyFieldsIndexesArray;
    private final int[] valueFieldsIndexesArray;
    private final List<String> keyFieldsList;
    private final List<String> valueFieldsList;
    // ...其他字段和方法...
}

3、在自定义的Deserializer类中,重写deserialize方法,对输入的Key和Value进行处理,去掉不需要的部分。

@Override
public Row deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    try {
        JsonNode jsonNode = objectMapper.readTree(record.value());
        String before = jsonNode != null && jsonNode.has(beforeKey) ? jsonNode.get(beforeKey).asText() : beforeValueDefault; // 如果需要,可以设置默认值或处理异常情况。
0