当前位置:首页 > 行业动态 > 正文

Flink CDC里请问flink1.7如何设置持久化checkpoint到阿里云oss啊?

在Flink 1.7中,要将checkpoint持久化到阿里云OSS,需要进行以下步骤:

1、引入相关依赖

在项目的pom.xml文件中添加Flink OSS Connector的依赖:

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flinkconnectoross_2.11</artifactId>
  <version>1.0.0</version>
</dependency>

2、配置OSS参数

在Flink的配置文件(如flinkconf.yaml)中添加以下OSS相关参数:

OSS Access Key ID
flink.oss.accesskeyid: <your_access_key_id>
OSS Access Key Secret
flink.oss.accesskeysecret: <your_access_key_secret>
OSS Endpoint
flink.oss.endpoint: <your_endpoint>
OSS Bucket
flink.oss.bucket: <your_bucket>
OSS Path Prefix
flink.oss.pathprefix: <your_path_prefix>
OSS File System Type
flink.oss.filesystemtype: <your_filesystem_type>

3、设置Checkpoint配置

在Flink程序中设置Checkpoint配置,将Checkpoint存储到OSS:

import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.checkpoint.CheckpointConfig;
import org.apache.flink.streaming.api.checkpoint.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointStorageLocation;
import org.apache.flink.streaming.api.checkpoint.CheckpointWriter;
public class FlinkCDCJob {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置Checkpoint配置
        CheckpointConfig config = env.getCheckpointConfig();
        config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        config.setCheckpointInterval(60000); // 检查点间隔为1分钟
        config.setMaxConcurrentCheckpoints(1); // 最多同时进行一个检查点
        config.setMinPauseBetweenCheckpoints(30000); // 检查点之间的最小暂停时间为30秒
        config.setCheckpointTimeout(10000); // 检查点超时时间为10秒
        config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 设置Checkpoint存储位置
        CheckpointStorageLocation storageLocation = new CheckpointStorageLocation("hdfs://localhost:9000/flink/checkpoints");
        storageLocation.setDefaultFilesystemType("oss");
        storageLocation.setFilesystemURI("oss://your_bucket/your_path_prefix");
        // 设置Checkpoint写入器
        env.setStateBackend(new RocksDBStateBackend(storageLocation));
        env.setCheckpointWriterFactory(new CheckpointWriterFactory<>(storageLocation));
        // 启动作业
        env.execute("Flink CDC Job");
    }
}

通过以上步骤,可以将Flink 1.7的checkpoint持久化到阿里云OSS。

0