博客 flink blob分析与使用

flink blob分析与使用

   数栈君   发表于 2023-02-23 17:23  683  0

背景

在chunjun中,会将任务用到的插件包调用env.registerCachedFile方法注册,该方法的作用是分布式缓存,对于它的理解是客户端对文件作注册,分布式任务端如open方法获取文件供任务使用。但对插件包作注册显然不是要做相应的作用。

shipfile与blob

对于flink on yarn任务而言,shipfile的作用是:如果是jar文件,会将该jar设置到任务的classpath中,即任务的环境变量;如果是普通file,会将文件放到资源文件目录;这个功能是有yarn来实现的。那如果flink不运行在调度组件里呢,如standalone,那由谁来实现与文件系统的交互,答案是flink的blob。

因此,如果二者都用到了,那肯定文件出现重复上传下载。flinkx与engine-plugins结合使用时,二者就都用到了。

1、yarn session任务,engine-plugins已经将所有的插件shipfile了,flinkx对插件注册分布式换缓存。

2、yarn perjob任务,YarnClusterDescriptor对相应插件shipfile,flinkx对插件注册分布式换缓存。

解决办法就是对flink on yarn任务,engine-plugins将flinkx注册的清理掉,避免文件重复上传下载。

flink Blob架构

https://cdn.nlark.com/yuque/0/2022/png/393929/1662004758242-3a4598cf-0ede-4755-96e6-a76edb2a9bae.png

Blob文件构建

PackagedProgramUtils createJobGraph

public static JobGraph createJobGraph(
PackagedProgram packagedProgram,
Configuration configuration,
int defaultParallelism,
@Nullable JobID jobID,
boolean suppressOutput)
throws ProgramInvocationException
{
final JobGraph jobGraph;
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
final Pipeline pipeline = getPipelineFromProgram(packagedProgram, configuration, defaultParallelism, suppressOutput);
try {
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, defaultParallelism);
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
if (jobID != null) {
jobGraph.setJobID(jobID);
}
jobGraph.addJars(packagedProgram.getJobJarAndDependencies());
jobGraph.setClasspaths(packagedProgram.getClasspaths());
jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());

return jobGraph;
}

RestClusterClient submitJob

1、序列化jobgraph文件到本地

2、设置fileUpload(jobgraph/userjar/userArtifacts)和构建requestBody

3、请求jobmanager提交任务、文件上传

public CompletableFuture submitJob(@Nonnull JobGraph jobGraph) {
CompletableFuture jobGraphFileFuture = CompletableFuture.supplyAsync(() -> {
try {
final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");
try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
objectOut.writeObject(jobGraph);
}
return jobGraphFile;
} catch (IOException e) {
throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e));
}
}, executorService);

CompletableFuture>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> {
List jarFileNames = new ArrayList<>(8);
List artifactFileNames = new ArrayList<>(8);
Collection filesToUpload = new ArrayList<>(8);
filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
for (Path jar : jobGraph.getUserJars()) {
jarFileNames.add(jar.getName());
filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
}
for (Map.Entry artifacts : jobGraph.getUserArtifacts().entrySet()) {
final Path artifactFilePath = new Path(artifacts.getValue().filePath);
try {
// Only local artifacts need to be uploaded.
if (!artifactFilePath.getFileSystem().isDistributedFS()) {
artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), artifactFilePath.getName()));
filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY));
}
} catch (IOException e) {
throw new CompletionException(
new FlinkException("Failed to get the FileSystem of artifact " + artifactFilePath + ".", e));
}
}

final JobSubmitRequestBody requestBody = new JobSubmitRequestBody(
jobGraphFile.getFileName().toString(),
jarFileNames,
artifactFileNames);

return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload));
});
final CompletableFuture submissionFuture = requestFuture.thenCompose(
requestAndFileUploads -> sendRetriableRequest(
JobSubmitHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
requestAndFileUploads.f0,
requestAndFileUploads.f1,
isConnectionProblemOrServiceUnavailable())
);

},>

Blob文件上传

JobSubmitHandler handleRequest

protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
final Collection uploadedFiles = request.getUploadedFiles();
final Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
File::getName,
Path::fromLocalFile
));

if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
nameToFile.size(),
uploadedFiles.size()),
HttpResponseStatus.BAD_REQUEST
);
}

final JobSubmitRequestBody requestBody = request.getRequestBody();

if (requestBody.jobGraphFileName == null) {
throw new RestHandlerException(
String.format("The %s field must not be omitted or be null.",
JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
HttpResponseStatus.BAD_REQUEST);
}

CompletableFuture jobGraphFuture = loadJobGraph(requestBody, nameToFile);

Collection jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);

Collection> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);

CompletableFuture finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);

CompletableFuture jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));

return jobSubmissionFuture.thenCombine(jobGraphFuture,
(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
},>,>

ClientUtils uploadJobGraphFiles

public static void uploadJobGraphFiles(
JobGraph jobGraph,
Collection
userJars,
Collection
> userArtifacts,
SupplierWithException
clientSupplier)
throws FlinkException
{
if (!userJars.isEmpty() || !userArtifacts.isEmpty()) {
try (BlobClient client = clientSupplier.get()) {
uploadAndSetUserJars(jobGraph, userJars, client);
uploadAndSetUserArtifacts(jobGraph, userArtifacts, client);
} catch (IOException ioe) {
throw new FlinkException("Could not upload job files.", ioe);
}
}
jobGraph.writeUserArtifactEntriesToConfiguration();
}

//具体实现,blobClient传到blobServer
private static Collection> uploadUserArtifacts(
JobID jobID, Collection> userArtifacts, BlobClient blobClient)
throws IOException {
Collection> blobKeys =
new ArrayList<>(userArtifacts.size());
for (Tuple2 userArtifact : userArtifacts) {
// only upload local files
if (!userArtifact.f1.getFileSystem().isDistributedFS()) {
final PermanentBlobKey blobKey = blobClient.uploadFile(jobID, userArtifact.f1);
blobKeys.add(Tuple2.of(userArtifact.f0, blobKey));
}
}
return blobKeys;
},>,>

BlobServer putPermanent

@Override
public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException {
checkNotNull(jobId);
return (PermanentBlobKey) putInputStream(jobId, inputStream, PERMANENT_BLOB);
}


//1、移动本地文件;2、permanentBlob上传到hdfs,以便任务恢复
private BlobKey putInputStream(
@Nullable JobID jobId, InputStream inputStream, BlobKey.BlobType blobType)

throws IOException
{

File incomingFile = createTemporaryFilename();
BlobKey blobKey = null;
try {
MessageDigest md = writeStreamToFileAndCreateDigest(inputStream, incomingFile);

// persist file
blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);

return blobKey;
} finally {
// delete incomingFile from a failed download
if (!incomingFile.delete() && incomingFile.exists()) {
LOG.warn(
"Could not delete the staging file {} for blob key {} and job {}.",
incomingFile,
blobKey,
jobId);
}
}
}

static void moveTempFileToStore(
File incomingFile,
@Nullable JobID jobId,
BlobKey blobKey,
File storageFile,
Logger log,
@Nullable BlobStore blobStore)

throws IOException
{

try {
// first check whether the file already exists
if (!storageFile.exists()) {
try {
// only move the file if it does not yet exist
Files.move(incomingFile.toPath(), storageFile.toPath());

incomingFile = null;

} catch (FileAlreadyExistsException ignored) {
...
}

if (blobStore != null) {
// only the one moving the incoming file to its final destination is allowed to
// upload the
// file to the blob store
blobStore.put(storageFile, jobId, blobKey);
}
} else {
log.warn(
"File upload for an existing file with key {} for job {}. This may indicate a duplicate upload or a hash collision. Ignoring newest upload.",
blobKey,
jobId);
}
storageFile = null;
} finally {
...
}
}

Blob文件下载

PermanentBlobCache getFile

protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
checkArgument(blobKey != null, "BLOB key cannot be null.");

final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
readWriteLock.readLock().lock();

try {
if (localFile.exists()) {
return localFile;
}
} finally {
readWriteLock.readLock().unlock();
}

// first try the distributed blob store (if available)
// use a temporary file (thread-safe without locking)
File incomingFile = createTemporaryFilename();
try {
try {
//BlobStore下载
if (blobView.get(jobId, blobKey, incomingFile)) {
// now move the temp file to our local cache atomically
readWriteLock.writeLock().lock();
try {
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, localFile, log, null);
} finally {
readWriteLock.writeLock().unlock();
}

return localFile;
}
} catch (Exception e) {
log.info(
"Failed to copy from blob store. Downloading from BLOB server instead.", e);
}

//异常的备选下载方案
...
return localFile;
} finally {
...
}
}

Blob文件使用

https://cdn.nlark.com/yuque/0/2022/png/393929/1662970883334-89857396-ed7e-4455-a099-b579846b85fd.png

创建classloader

将userJar和classPath设置到classloader中

private URLClassLoader createUserCodeClassLoader(
JobID jobId,
Collection requiredJarFiles,
Collection requiredClasspaths)
throws IOException {
try {
final URL[] libraryURLs =
new URL[requiredJarFiles.size() + requiredClasspaths.size()];
int count = 0;
// add URLs to locally cached JAR files
for (PermanentBlobKey key : requiredJarFiles) {
libraryURLs[count] = blobService.getFile(jobId, key).toURI().toURL();
++count;
}

// add classpaths
for (URL url : requiredClasspaths) {
libraryURLs[count] = url;
++count;
}

return classLoaderFactory.createClassLoader(libraryURLs);
} catch (Exception e) {
// rethrow or wrap
}
}

userArtifacts文件

org.apache.flink.runtime.taskmanager.Task#doRun

try {
for (Map.Entry entry :
DistributedCache.readFileInfoFromConfig(jobConfiguration)) {
LOG.info("Obtaining local cache file for '{}'.", entry.getKey());
Future cp =
fileCache.createTmpFile(
entry.getKey(), entry.getValue(), jobId, executionId);
distributedCacheEntries.put(entry.getKey(), cp);
}
} catch (Exception e) {
...
}

public Future createTmpFile(String name, DistributedCacheEntry entry, JobID jobID, ExecutionAttemptID executionId) throws Exception {
synchronized (lock) {
Map> jobEntries = entries.computeIfAbsent(jobID, k -> new HashMap<>());

// register reference holder
final Set refHolders = jobRefHolders.computeIfAbsent(jobID, id -> new HashSet<>());
refHolders.add(executionId);

Future fileEntry = jobEntries.get(name);
if (fileEntry != null) {
// file is already in the cache. return a future that
// immediately returns the file
return fileEntry;
} else {
// need to copy the file
// create the target path
File tempDirToUse = new File(storageDirectories[nextDirectory++], jobID.toString());
if (nextDirectory >= storageDirectories.length) {
nextDirectory = 0;
}

// kick off the copying
Callable cp;
if (entry.blobKey != null) {
cp = new CopyFromBlobProcess(entry, jobID, blobService, new Path(tempDirToUse.getAbsolutePath()));
} else {
## note: 从hdfs异步拷贝到TM内部文件夹
cp = new CopyFromDFSProcess(entry, new Path(tempDirToUse.getAbsolutePath()));
}
FutureTask copyTask = new FutureTask<>(cp);
executorService.submit(copyTask);

// store our entry
jobEntries.put(name, copyTask);
return copyTask;
}
}
}

Environment env =
new RuntimeEnvironment(
jobId,
vertexId,
executionId,
executionConfig,
taskInfo,
jobConfiguration,
taskConfiguration,
userCodeClassLoader,
memoryManager,
ioManager,
broadcastVariableManager,
taskStateManager,
aggregateManager,
accumulatorRegistry,
kvStateRegistry,
inputSplitProvider,
distributedCacheEntries,
consumableNotifyingPartitionWriters,
inputGates,
taskEventDispatcher,
checkpointResponder,
operatorCoordinatorEventGateway,
taskManagerConfig,
metrics,
this,
externalResourceInfoProvider);,>,>

总结

1、flinkx或engine-plugins构建jobGraph时设置的userJar和classpath会设置打任务的classloader中。

2、env.registerCachedFile注册插件,任务运行是并没有后续的使用,属于多余的上传下载操作。

3、基于yarn的调度,shipfile作用与1相同,因此设置usejar和classpath也是重复的操作。

4、引申:userJar和classPath不使用shipfile,且只上传当前需要的插件,是否可以做到类加载隔离。


想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群