哪个网站卖自己做的手工艺品,商标logo一键生成器,西安网站建设推广优化,做h网站Savepoints
1.什么是 Savepoints
Savepoint 是依据 Flink checkpointing 机制所创建的流作业执行状态的镜像#xff0c;可以使用 Savepoint 进行 Flink 作业的停止、重启或更新。
Savepoint 由两部分组成#xff1a;稳定存储#xff08;例如 HDFS#xff0c;S3#xff…Savepoints
1.什么是 Savepoints
Savepoint 是依据 Flink checkpointing 机制所创建的流作业执行状态的镜像可以使用 Savepoint 进行 Flink 作业的停止、重启或更新。
Savepoint 由两部分组成稳定存储例如 HDFSS3…) 上包含二进制文件的目录通常很大和元数据文件相对较小。
稳定存储上的文件表示作业执行状态的数据镜像。Savepoint 的元数据文件以相对路径的形式主要包含指向作为 Savepoint 的稳定存储上的所有文件的指针。
2.分配算子 ID
a概述
建议通过 uid(String) 方法手动指定算子 ID 这些 ID 将用于恢复每个算子的状态。
DataStreamString stream env.// Stateful source (e.g. Kafka) with ID.addSource(new StatefulSource()).uid(source-id) // ID for the source operator.shuffle()// Stateful mapper with ID.map(new StatefulMapper()).uid(mapper-id) // ID for the mapper// Stateless printing sink.print(); // Auto-generated ID如果不手动指定 ID 则会自动生成 ID 只要这些 ID 不变就可以从 Savepoint 自动恢复生成的 ID 取决于程序的结构并且对程序更改很敏感强烈建议手动分配这些 ID 。
bSavepoint 状态
可以将 Savepoint 想象为每个有状态的算子保存一个映射 “算子 ID -状态”
Operator ID | State
------------------------------------
source-id | State of StatefulSource
mapper-id | State of StatefulMapper示例中print sink 是无状态的因此不是 Savepoint 状态的一部分默认情况下尝试将 Savepoint 的每个条目映射回新程序。
3.算子
a概述
可以使用 命令行客户端 来触发 Savepoint触发 Savepoint 并取消作业从 Savepoint 恢复以及删除 Savepoint。
从 Flink 1.2.0 开始还可以使用 webui 从 Savepoint 恢复。
b触发 Savepoint
当触发 Savepoint 时将创建一个新的 Savepoint 目录用于存储数据和元数据可以通过配置默认目标目录或使用触发器命令指定自定义目标目录来控制该目录的位置。 注意: 目标目录必须是 JobManager(s) 和 TaskManager(s) 都可以访问的位置例如分布式文件系统或者对象存储系统上的位置。 以 FsStateBackend 或 RocksDBStateBackend 为例
# Savepoint 目标目录
/savepoint/# Savepoint 目录
/savepoint/savepoint-:shortjobid-:savepointid/# Savepoint 文件包含 Checkpoint元数据
/savepoint/savepoint-:shortjobid-:savepointid/_metadata# Savepoint 状态
/savepoint/savepoint-:shortjobid-:savepointid/...从 1.11.0 开始可以通过移动拷贝savepoint 目录到任意地方然后再进行恢复。 如下两种情况不支持 savepoint 目录的移动 1启用了 entropy injection 此时 savepoint 目录不包含所有的数据文件因为注入的路径会分散在各个路径中由于缺乏一个共同的根目录因此 savepoint 将包含绝对路径从而导致无法支持 savepoint 目录的迁移。 2作业包含了 task-owned state比如 GenericWriteAhreadLog sink。 和 savepoint 不同checkpoint 不支持任意移动文件因为 checkpoint 可能包含一些文件的绝对路径。 如果使用 MemoryStateBackend 的话metadata 和 savepoint 的数据都会保存在 _metadata 文件中。
Savepoint 格式
可以在 savepoint 的两种二进制格式之间进行选择
标准格式 - 一种在所有 state backends 间统一的格式允许使用一种状态后端创建 savepoint 后使用另一种状态后端恢复这个 savepoint。这是最稳定的格式旨在与之前的版本、模式、修改等保持最大兼容性。原生格式 - 标准格式的缺点是它的创建和恢复速度通常很慢。原生格式以特定于使用的状态后端的格式创建快照例如 RocksDB 的 SST 文件。 以原生格式创建 savepoint 的能力在 Flink 1.15 中引入在那之前 savepoint 都是以标准格式创建的。 触发 Savepoint
$ bin/flink savepoint :jobId [:targetDirectory]将触发 ID 为 :jobId 的作业的 Savepoint并返回创建的 Savepoint 路径此路径可用来恢复和删除 Savepoint 也可以指定创建 Savepoint 的格式如果没有指定会采用标准格式创建 Savepoint。
$ bin/flink savepoint --type [native/canonical] :jobId [:targetDirectory]使用上述命令触发 savepoint 时client 需要等待 savepoint 制作完成因此当任务的状态较大时可能会导致 client 出现超时的情况可以使用 detach 模式来触发savepoint。
$ bin/flink savepoint :jobId [:targetDirectory] -detached使用该命令时client 拿到本次 savepoint 的 trigger id 后立即返回可以通过 REST API 来监控本次 savepoint 的制作情况。
使用 YARN 触发 Savepoint
$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId将触发 ID 为 :jobId 和 YARN 应用程序 ID :yarnAppId 的作业的 Savepoint并返回创建的 Savepoint 的路径。
使用 Savepoint 停止作业
$ bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId将自动触发 ID 为 :jobid 的作业的 Savepoint并停止该作业可以指定一个目标文件系统目录来存储 Savepoint 该目录需要能被 JobManager(s) 和 TaskManager(s) 访问可以指定创建 Savepoint 的格式如果没有指定会采用标准格式创建 Savepoint。
如果想使用 detach 模式触发 Savepoint在命令行后添加选项-detached即可。
c从 Savepoint 恢复
$ bin/flink run -s :savepointPath [:runArgs]将提交作业并指定要从中恢复的 Savepoint 可以给出 Savepoint 目录或 _metadata 文件的路径。
跳过无法映射的状态恢复
默认resume 操作将尝试将 Savepoint 的所有状态映射回要还原的程序如果删除了运算符则可以通过 --allowNonRestoredStateshort-n选项跳过无法映射到新程序的状态
Restore 模式
Restore 模式 决定了在 restore 之后谁拥有 Savepoint 或者 externalized checkpoint 的文件的所有权。
快照可被用户或者 Flink 自身拥有如果快照归用户所有Flink 不会删除其中的文件而且 Flink 不能依赖该快照中文件的存在因为它可能在 Flink 的控制之外被删除。
每种 restore 模式都有特定的用途默认的 NO_CLAIM 模式在大多数情况下是一个很好的折中方案因为它在提供明确的所有权归属的同时只给恢复后第一个 checkpoint 带来较小的代价。
可以通过如下方式指定 restore 模式
$ bin/flink run -s :savepointPath -restoreMode :mode -n [:runArgs]NO_CLAIM 默认的
在 NO_CLAIM 模式下Flink 不会接管快照的所有权它会将快照的文件置于用户的控制之中并且永远不会删除其中的任何文件该模式下可以从同一个快照上启动多个作业。
为保证 Flink 不会依赖于该快照的任何文件它会强制第一个成功的 checkpoint 为全量 checkpoint 而不是增量的仅对state.backend: rocksdb 有影响因为其他 backend 总是创建全量 checkpoint。
一旦第一个全量的 checkpoint 完成后所有后续的 checkpoint 会照常创建当第一个 checkpoint 成功制作就可以删除原快照在此之前不能删除原快照因为没有任何完成的 checkpointFlink 会在故障时尝试从初始的快照恢复。 CLAIM
CLAIM 模式下 Flink 将声称拥有快照的所有权并且本质上将其作为 checkpoint 对待控制其生命周期并且可能会在其永远不会被用于恢复的时候删除它手动删除快照和从同一个快照上启动两个作业都是不安全的Flink 会保持配置数量的 checkpoint。 注意 Retained checkpoints 被存储在 //chk- 的目录中。Flink 不会接管 / 目录的所有权而只会接管 chk- 的所有权。Flink 不会删除旧作业的目录。Native 格式支持增量的 RocksDB savepoints。对于这些 savepointsFlink 将所有 SST 存储在 savepoints 目录中。即这些 savepoints 是自包含和目录可移动的。然而在 CLAIM 模式下恢复时后续的 checkpoints 可能会复用一些 SST 文件这反过来会阻止在 savepoints 被清理时删除 savepoints 目录。 Flink 之后运行期间可能会删除复用的SST 文件但不会删除 savepoints 目录。因此如果在 CLAIM 模式下恢复Flink 可能会留下一个空的 savepoints 目录。 LEGACY (已废弃)
Legacy 模式是 Flink 在 1.15 之前的工作方式。该模式下 Flink 永远不会删除初始恢复的 checkpoint。同时用户也不清楚是否可以删除它原因是 Flink 会在用来恢复的 checkpoint 之上创建增量的 checkpoint因此后续的 checkpoint 都有可能会依赖于用于恢复的那个 checkpoint即恢复的 checkpoint 的所有权没有明确的界定。 注意: LEGACY 模式已经被废弃在 Flink 2.0 版本将会被移除。请使用 CLAIM 或 NO_CLAIM 模式。
d删除 Savepoint
$ bin/flink savepoint -d :savepointPath删除存储在 :savepointPath 中的 Savepoint。
注意还可以通过常规文件系统操作手动删除 Savepoint 而不会影响其他 Savepoint 或 Checkpoint。
e配置
可以通过 state.savepoints.dir 配置 savepoint 的默认目录触发 savepoint 时将使用此目录来存储 savepoint可以通过使用触发器命令指定自定义目标目录来覆盖缺省值。
# 默认 Savepoint 目标目录
state.savepoints.dir: hdfs:///flink/savepoints如果既未配置缺省值也未指定自定义目标目录则触发 Savepoint 将失败。 注意: 目标目录必须是 JobManager(s) 和 TaskManager(s) 可访问的位置例如分布式文件系统上的位置。 4.F.A.Q
应该为作业中的所有算子分配 ID 吗?
根据经验是的。 严格来说仅通过 uid 方法给有状态算子分配 ID 就足够了。Savepoint 仅包含这些有状态算子的状态无状态算子不是 Savepoint 的一部分。
在实践中建议给所有算子分配 ID因为 Flink 的一些内置算子如 Window 算子也是有状态的而内置算子是否有状态并不很明显。 如果完全确定算子是无状态的则可以跳过 uid 方法。
如果在作业中添加一个需要状态的新算子会发生什么
当向作业添加新算子时它将在没有任何状态的情况下进行初始化。 Savepoint 包含每个有状态算子的状态。 无状态算子根本不是 Savepoint 的一部分。 新算子的行为类似于无状态算子。
如果从作业中删除有状态的算子会发生什么?
默认情况下从 Savepoint 恢复时将尝试将所有状态分配给新作业。如果有状态算子被删除则无法从 Savepoint 恢复。
可以通过使用 run 命令设置 --allowNonRestoredState (简称-n )来允许删除有状态算子:
$ bin/flink run -s :savepointPath -n [:runArgs]如果在作业中重新排序有状态算子会发生什么?
如果给这些算子分配了 ID它们将像往常一样恢复。
如果没有分配 ID 则有状态操作符自动生成的 ID 很可能在重新排序后发生更改。这将导致无法从以前的 Savepoint 恢复。
如果我添加、删除或重新排序作业中没有状态的算子会发生什么?
如果将 ID 分配给有状态操作符则无状态操作符不会影响 Savepoint 恢复。
如果没有分配 ID 则有状态操作符自动生成的 ID 很可能在重新排序后发生更改。这将导致无法从以前的Savepoint 恢复。
当在恢复时改变程序的并行度时会发生什么?
如果 Savepoint 是用 Flink 1.2.0 触发的并且没有使用像 Checkpointed 这样不推荐的状态 API那么可以简单地从 Savepoint 恢复程序并指定新的并行度。
如果正在从 Flink 1.2.0 触发的 Savepoint 恢复或者使用现在已经废弃的 api那么首先必须将作业和 Savepoint 迁移到 Flink 1.2.0然后才能更改并行度。
可以将 savepoint 文件移动到稳定存储上吗?
从 Flink 1.11.0 版本开始savepoint 是自包含的可以按需迁移 savepoint 文件后进行恢复。