在chunjun中,会将任务用到的插件包调用env.registerCachedFile方法注册,该方法的作用是分布式缓存,对于它的理解是客户端对文件作注册,分布式任务端如open方法获取文件供任务使用。但对插件包作注册显然不是要做相应的作用。
对于flink on yarn任务而言,shipfile的作用是:如果是jar文件,会将该jar设置到任务的classpath中,即任务的环境变量;如果是普通file,会将文件放到资源文件目录;这个功能是有yarn来实现的。那如果flink不运行在调度组件里呢,如standalone,那由谁来实现与文件系统的交互,答案是flink的blob。
因此,如果二者都用到了,那肯定文件出现重复上传下载。flinkx与engine-plugins结合使用时,二者就都用到了。
1、yarn session任务,engine-plugins已经将所有的插件shipfile了,flinkx对插件注册分布式换缓存。
2、yarn perjob任务,YarnClusterDescriptor对相应插件shipfile,flinkx对插件注册分布式换缓存。
解决办法就是对flink on yarn任务,engine-plugins将flinkx注册的清理掉,避免文件重复上传下载。
public static JobGraph createJobGraph(
PackagedProgram packagedProgram,
Configuration configuration,
int defaultParallelism,
@Nullable JobID jobID,
boolean suppressOutput) throws ProgramInvocationException {
final JobGraph jobGraph;
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
final Pipeline pipeline = getPipelineFromProgram(packagedProgram, configuration, defaultParallelism, suppressOutput);
try {
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, defaultParallelism);
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
if (jobID != null) {
jobGraph.setJobID(jobID);
}
jobGraph.addJars(packagedProgram.getJobJarAndDependencies());
jobGraph.setClasspaths(packagedProgram.getClasspaths());
jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
return jobGraph;
}
1、序列化jobgraph文件到本地
2、设置fileUpload(jobgraph/userjar/userArtifacts)和构建requestBody
3、请求jobmanager提交任务、文件上传
public CompletableFuture submitJob(@Nonnull JobGraph jobGraph) {
CompletableFuture jobGraphFileFuture = CompletableFuture.supplyAsync(() -> {
try {
final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");
try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
objectOut.writeObject(jobGraph);
}
return jobGraphFile;
} catch (IOException e) {
throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e));
}
}, executorService);
CompletableFuture>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> {
List jarFileNames = new ArrayList<>(8);
List artifactFileNames = new ArrayList<>(8);
Collection filesToUpload = new ArrayList<>(8);
filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
for (Path jar : jobGraph.getUserJars()) {
jarFileNames.add(jar.getName());
filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
}
for (Map.Entry artifacts : jobGraph.getUserArtifacts().entrySet()) {
final Path artifactFilePath = new Path(artifacts.getValue().filePath);
try {
// Only local artifacts need to be uploaded.
if (!artifactFilePath.getFileSystem().isDistributedFS()) {
artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), artifactFilePath.getName()));
filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY));
}
} catch (IOException e) {
throw new CompletionException(
new FlinkException("Failed to get the FileSystem of artifact " + artifactFilePath + ".", e));
}
}
final JobSubmitRequestBody requestBody = new JobSubmitRequestBody(
jobGraphFile.getFileName().toString(),
jarFileNames,
artifactFileNames);
return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload));
});
final CompletableFuture submissionFuture = requestFuture.thenCompose(
requestAndFileUploads -> sendRetriableRequest(
JobSubmitHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
requestAndFileUploads.f0,
requestAndFileUploads.f1,
isConnectionProblemOrServiceUnavailable())
);
},>
protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
final Collection uploadedFiles = request.getUploadedFiles();
final Map nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
File::getName,
Path::fromLocalFile
));
if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
nameToFile.size(),
uploadedFiles.size()),
HttpResponseStatus.BAD_REQUEST
);
}
final JobSubmitRequestBody requestBody = request.getRequestBody();
if (requestBody.jobGraphFileName == null) {
throw new RestHandlerException(
String.format("The %s field must not be omitted or be null.",
JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
HttpResponseStatus.BAD_REQUEST);
}
CompletableFuture jobGraphFuture = loadJobGraph(requestBody, nameToFile);
Collection jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);
Collection> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);
CompletableFuture finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);
CompletableFuture jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));
return jobSubmissionFuture.thenCombine(jobGraphFuture,
(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
},>,>
public static void uploadJobGraphFiles(
JobGraph jobGraph,
Collection userJars,
Collection> userArtifacts,
SupplierWithException clientSupplier)
throws FlinkException {
if (!userJars.isEmpty() || !userArtifacts.isEmpty()) {
try (BlobClient client = clientSupplier.get()) {
uploadAndSetUserJars(jobGraph, userJars, client);
uploadAndSetUserArtifacts(jobGraph, userArtifacts, client);
} catch (IOException ioe) {
throw new FlinkException("Could not upload job files.", ioe);
}
}
jobGraph.writeUserArtifactEntriesToConfiguration();
}
//具体实现,blobClient传到blobServer
private static Collection> uploadUserArtifacts(
JobID jobID, Collection> userArtifacts, BlobClient blobClient)
throws IOException {
Collection> blobKeys =
new ArrayList<>(userArtifacts.size());
for (Tuple2 userArtifact : userArtifacts) {
// only upload local files
if (!userArtifact.f1.getFileSystem().isDistributedFS()) {
final PermanentBlobKey blobKey = blobClient.uploadFile(jobID, userArtifact.f1);
blobKeys.add(Tuple2.of(userArtifact.f0, blobKey));
}
}
return blobKeys;
},>,>
@Override
public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException {
checkNotNull(jobId);
return (PermanentBlobKey) putInputStream(jobId, inputStream, PERMANENT_BLOB);
}
//1、移动本地文件;2、permanentBlob上传到hdfs,以便任务恢复
private BlobKey putInputStream(
@Nullable JobID jobId, InputStream inputStream, BlobKey.BlobType blobType)
throws IOException {
File incomingFile = createTemporaryFilename();
BlobKey blobKey = null;
try {
MessageDigest md = writeStreamToFileAndCreateDigest(inputStream, incomingFile);
// persist file
blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);
return blobKey;
} finally {
// delete incomingFile from a failed download
if (!incomingFile.delete() && incomingFile.exists()) {
LOG.warn(
"Could not delete the staging file {} for blob key {} and job {}.",
incomingFile,
blobKey,
jobId);
}
}
}
static void moveTempFileToStore(
File incomingFile,
@Nullable JobID jobId,
BlobKey blobKey,
File storageFile,
Logger log,
@Nullable BlobStore blobStore)
throws IOException {
try {
// first check whether the file already exists
if (!storageFile.exists()) {
try {
// only move the file if it does not yet exist
Files.move(incomingFile.toPath(), storageFile.toPath());
incomingFile = null;
} catch (FileAlreadyExistsException ignored) {
...
}
if (blobStore != null) {
// only the one moving the incoming file to its final destination is allowed to
// upload the
// file to the blob store
blobStore.put(storageFile, jobId, blobKey);
}
} else {
log.warn(
"File upload for an existing file with key {} for job {}. This may indicate a duplicate upload or a hash collision. Ignoring newest upload.",
blobKey,
jobId);
}
storageFile = null;
} finally {
...
}
}
protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
checkArgument(blobKey != null, "BLOB key cannot be null.");
final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
readWriteLock.readLock().lock();
try {
if (localFile.exists()) {
return localFile;
}
} finally {
readWriteLock.readLock().unlock();
}
// first try the distributed blob store (if available)
// use a temporary file (thread-safe without locking)
File incomingFile = createTemporaryFilename();
try {
try {
//BlobStore下载
if (blobView.get(jobId, blobKey, incomingFile)) {
// now move the temp file to our local cache atomically
readWriteLock.writeLock().lock();
try {
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, localFile, log, null);
} finally {
readWriteLock.writeLock().unlock();
}
return localFile;
}
} catch (Exception e) {
log.info(
"Failed to copy from blob store. Downloading from BLOB server instead.", e);
}
//异常的备选下载方案
...
return localFile;
} finally {
...
}
}
将userJar和classPath设置到classloader中
private URLClassLoader createUserCodeClassLoader(
JobID jobId,
Collection requiredJarFiles,
Collection requiredClasspaths)
throws IOException {
try {
final URL[] libraryURLs =
new URL[requiredJarFiles.size() + requiredClasspaths.size()];
int count = 0;
// add URLs to locally cached JAR files
for (PermanentBlobKey key : requiredJarFiles) {
libraryURLs[count] = blobService.getFile(jobId, key).toURI().toURL();
++count;
}
// add classpaths
for (URL url : requiredClasspaths) {
libraryURLs[count] = url;
++count;
}
return classLoaderFactory.createClassLoader(libraryURLs);
} catch (Exception e) {
// rethrow or wrap
}
}
org.apache.flink.runtime.taskmanager.Task#doRun
try {
for (Map.Entry entry :
DistributedCache.readFileInfoFromConfig(jobConfiguration)) {
LOG.info("Obtaining local cache file for '{}'.", entry.getKey());
Future cp =
fileCache.createTmpFile(
entry.getKey(), entry.getValue(), jobId, executionId);
distributedCacheEntries.put(entry.getKey(), cp);
}
} catch (Exception e) {
...
}
public Future createTmpFile(String name, DistributedCacheEntry entry, JobID jobID, ExecutionAttemptID executionId) throws Exception {
synchronized (lock) {
Map> jobEntries = entries.computeIfAbsent(jobID, k -> new HashMap<>());
// register reference holder
final Set refHolders = jobRefHolders.computeIfAbsent(jobID, id -> new HashSet<>());
refHolders.add(executionId);
Future fileEntry = jobEntries.get(name);
if (fileEntry != null) {
// file is already in the cache. return a future that
// immediately returns the file
return fileEntry;
} else {
// need to copy the file
// create the target path
File tempDirToUse = new File(storageDirectories[nextDirectory++], jobID.toString());
if (nextDirectory >= storageDirectories.length) {
nextDirectory = 0;
}
// kick off the copying
Callable cp;
if (entry.blobKey != null) {
cp = new CopyFromBlobProcess(entry, jobID, blobService, new Path(tempDirToUse.getAbsolutePath()));
} else {
## note: 从hdfs异步拷贝到TM内部文件夹
cp = new CopyFromDFSProcess(entry, new Path(tempDirToUse.getAbsolutePath()));
}
FutureTask copyTask = new FutureTask<>(cp);
executorService.submit(copyTask);
// store our entry
jobEntries.put(name, copyTask);
return copyTask;
}
}
}
Environment env =
new RuntimeEnvironment(
jobId,
vertexId,
executionId,
executionConfig,
taskInfo,
jobConfiguration,
taskConfiguration,
userCodeClassLoader,
memoryManager,
ioManager,
broadcastVariableManager,
taskStateManager,
aggregateManager,
accumulatorRegistry,
kvStateRegistry,
inputSplitProvider,
distributedCacheEntries,
consumableNotifyingPartitionWriters,
inputGates,
taskEventDispatcher,
checkpointResponder,
operatorCoordinatorEventGateway,
taskManagerConfig,
metrics,
this,
externalResourceInfoProvider);,>,>
1、flinkx或engine-plugins构建jobGraph时设置的userJar和classpath会设置打任务的classloader中。
2、env.registerCachedFile注册插件,任务运行是并没有后续的使用,属于多余的上传下载操作。
3、基于yarn的调度,shipfile作用与1相同,因此设置usejar和classpath也是重复的操作。
4、引申:userJar和classPath不使用shipfile,且只上传当前需要的插件,是否可以做到类加载隔离。
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack