Drill-on-YARN之源码解析

作者 : 开心源码 本文共12280个字,预计阅读时间需要31分钟 发布时间: 2022-05-12 共155人阅读

1. 概要

前面详情了如何把Drill部署在YARN上,而后通过Drill-on-YARN用户端,你可以启动、中止、调整、清零命令操作Drill。但是在这么命令背后,究竟是如何执行的呢,下面会对Drill-on-YARN的源码进行详细的解析,重点解析启动过程,其余命令简单详情。

说明:下面涉及到的代码,以drill 1.14.0为准,并且为了减少篇幅,进行了删减。

2. Drill-on-YARN start

2.1 drill-on-yarn.sh

通过查看drill-on-yarn.sh脚本,很容易发现最终执行的java类是CLIENT_CMD=”$JAVA $VM_OPTS -cp $CP org.apache.drill.yarn.client.DrillOnYarn ${args[@]}”。 org.apache.drill.yarn.client.DrillOnYarn便是启动Drill-on-YARN的入口。我们可以总览一下这个类:

public class DrillOnYarn {
public static void main(String argv[]) {
BasicConfigurator.configure();
ClientContext.init();
run(argv);
}
public static void run(String argv[]) {
ClientContext context = ClientContext.instance();
CommandLineOptions opts = new CommandLineOptions();
if (!opts.parse(argv)) {
opts.usage();
context.exit(-1);
}
if (opts.getCommand() == null) {
opts.usage();
context.exit(-1);
}
try {
DrillOnYarnConfig.load().setClientPaths();
} catch (DoyConfigException e) {
ClientContext.err.println(e.getMessage());
context.exit(-1);
}
ClientCommand cmd;
switch (opts.getCommand()) {
case UPLOAD:
cmd = new StartCommand(true, false);
break;
case START:
cmd = new StartCommand(true, true);
break;
case DESCRIBE:
cmd = new PrintConfigCommand();
break;
case STATUS:
cmd = new StatusCommand();
break;
case STOP:
cmd = new StopCommand();
break;
case CLEAN:
cmd = new CleanCommand();
break;
case RESIZE:
cmd = new ResizeCommand();
break;
default:
cmd = new HelpCommand();
}
cmd.setOpts(opts);
try {
cmd.run();
} catch (ClientException e) {
displayError(opts, e);
context.exit(1);
}
}
}

可以看到入口main方法,其中最关键的便是run方法,包含了很多的命令,我们重点看start命令,代码如下:

public void run() throws ClientException {
checkExistingApp();
dryRun = opts.dryRun;
config = DrillOnYarnConfig.config();
FileUploader uploader = upload();
if (launch) {
launch(uploader);
}
}

概括的来说,它主要包含以下流程:

  1. 检查application能否已经存在,假如已经存在,便不允许启动,否则执行启动操作(此处检查的application是YARN的application,启动成功会将YARN的applicationId写入本地磁盘的一个文件,通过此文件来检查)。
  2. 上传Drill二方包和site目录下的内容至DFS上,其中site目录下的内容会被打包为site.tar.gz
public void run() throws ClientException {
setup();
uploadDrillArchive();
if (hasSiteDir()) {
uploadSite();
}
}
  1. 启动ApplicationMaster,主要流程为:
  • 创立YARN用户端,并启动
// AMRunner#connectToYarn
private void connectToYarn() {
System.out.print("Loading YARN Config...");
client = new YarnRMClient();
System.out.println(" Loaded.");
}
  • 创立ApplicationMaster
// AMRunner#createApp
private void createApp() throws ClientException {
try {
appResponse = client.createAppMaster();
} catch (YarnClientException e) {
throw new ClientException("Failed to allocate Drill application master",
e);
}
appId = appResponse.getApplicationId();
System.out.println("Application ID: " + appId.toString());
}
  • 设置ApplicationMaster上下文,包括:Heap memory、Class Path、启动的命令(dirll-am.sh)、启动am容器用的资源(memory、vCores、disks)
  • 校验资源,主要是ApplicationMaster用资源能否超过了YARN的设置
  • 提交ApplicationMaster
private void launchApp(AppSpec master) throws ClientException {
try {
client.submitAppMaster(master);
} catch (YarnClientException e) {
throw new ClientException("Failed to start Drill application master", e);
}
}
  • 等待启动,并打印启动日志
  • 将ApplicationMaster的appid写入文件(在第1步,检测Application能否存在,就是用这个文件)

ApplicationMaster启动后,会向RM申请资源,启动Drillbits,下面详细详情ApplicationMaster启动后的操作

2.2 drill-am.sh

通过查看drill-am.sh脚本,很容易发现最终执行的java类是AMCMD=”$JAVA $AM_JAVA_OPTS ${args[@]} -cp $CP org.apache.drill.yarn.appMaster.DrillApplicationMaster”。org.apache.drill.yarn.appMaster.DrillApplicationMaste表示ApplicationMaster执行的入口,下面总览一下这个类:

public class DrillApplicationMaster {
public static void main(String[] args) {
LOG.trace("Drill Application Master starting.");
try {
DrillOnYarnConfig.load().setAmDrillHome();
} catch (DoyConfigException e) {
System.err.println(e.getMessage());
System.exit(-1);
}
Dispatcher dispatcher;
try {
dispatcher = (new DrillControllerFactory()).build();
} catch (ControllerFactoryException e) {
LOG.error("Setup failed, exiting: " + e.getMessage(), e);
System.exit(-1);
return;
}
try {
if (!dispatcher.start()) {
return;
}
} catch (Throwable e) {
LOG.error("Fatal error, exiting: " + e.getMessage(), e);
System.exit(-1);
}
WebServer webServer = new WebServer(dispatcher);
try {
webServer.start();
} catch (Exception e) {
LOG.error("Web server setup failed, exiting: " + e.getMessage(), e);
System.exit(-1);
}
try {
dispatcher.run();
} catch (Throwable e) {
LOG.error("Fatal error, exiting: " + e.getMessage(), e);
System.exit(-1);
} finally {
try {
webServer.close();
} catch (Exception e) {
}
}
}
}

概略的来说,它主要包含以下流程:

  1. 加载Drill-on-YARN的配置,并设置AM的DirllHome,比方/home/admin/tmp2/hadoop/nm-local-dir/usercache/admin/appcache/application_1534698866098_0022/container_1534698866098_0022_01_000001/drill/apache-drill-1.14.0
  2. 构造Dispatcher,Dispatcher使用于分配YARN、timer、ZooKeeper事件给给集群控制器,它是轻量级多线程的,使用于响应RM、NM、timer线程的事件,对于某一个事件,它是连续的,所以需要同步,但是不同类型的事件不需要同步。整个的构造流程如下:
  • 准备资源,包括:drill二方包、site压缩包的目录
private Map prepareResources() {
...
drillArchivePath = drillConfig.getDrillArchiveDfsPath();
siteArchivePath = drillConfig.getSiteArchiveDfsPath();
...
}
  • 定义任务启动的规格(TaskSpec),包括:运行时环境、YARN container的规格、dirllbit的规格
private TaskSpec buildDrillTaskSpec(Map resources) throws DoyConfigException {
...
ContainerRequestSpec containerSpec = new ContainerRequestSpec();
containerSpec.memoryMb = config.getInt(DrillOnYarnConfig.DRILLBIT_MEMORY);
...
LaunchSpec drillbitSpec = new LaunchSpec();
...
TaskSpec taskSpec = new TaskSpec();
taskSpec.name = "Drillbit";
taskSpec.containerSpec = containerSpec;
taskSpec.launchSpec = drillbitSpec;
}
  • 设置Dispatcher的控制器:实现类为ClusterControllerImpl,它主要通过状态来控制Drill集群、调整整个集群的任务(Drill启动、中止等任务)、解决container的回调
public void setYarn(AMYarnFacade yarn) throws YarnFacadeException {
this.yarn = yarn;
controller = new ClusterControllerImpl(yarn);
}
  • 为控制器注册Scheduler,比方DrillbitScheduler,此外Scheduler配置来源于之前drill-on-yarn.conf
cluster: [
{
name: "drill-group1"
type: "basic"
count: 1
}
]
...
ClusterDef.ClusterGroup pool = ClusterDef.getCluster(config, 0);
Scheduler testGroup = new DrillbitScheduler(pool.getName(), taskSpec,
pool.getCount(), requestTimeoutSecs, maxExtraNodes);
dispatcher.getController().registerScheduler(testGroup);
...
  • 创立ZooKeeper集群协调器
String zkConnect = config.getString(DrillOnYarnConfig.ZK_CONNECT);
String zkRoot = config.getString(DrillOnYarnConfig.ZK_ROOT);
String clusterId = config.getString(DrillOnYarnConfig.CLUSTER_ID);
  1. 启动Dispatcher,主要启动AMRMClientAsync、NMClientAsync、YarnClient
...
yarn.start(new ResourceCallback(), new NodeCallback());
String url = trackingUrl.replace("", Integer.toString(httpPort));
if (DrillOnYarnConfig.config().getBoolean(DrillOnYarnConfig.HTTP_ENABLE_SSL)) {
url = url.replace("http:", "https:");
}
yarn.register(url);
controller.started();
...
...
resourceMgr = AMRMClientAsync.createAMRMClientAsync(pollPeriodMs, resourceCallback);
resourceMgr.init(conf);
resourceMgr.start();
...
nodeMgr = NMClientAsync.createNMClientAsync(nodeCallback);
nodeMgr.init(conf);
nodeMgr.start();
...
client = YarnClient.createYarnClient();
client.init(conf);
client.start();
...
  1. 启动dirll运维界面
WebServer webServer = new WebServer(dispatcher);
webServer.start();
  1. 运行Dispatcher,主要是启动一个线程,此线程会不断的轮询当前的任务队列中的任务情况,比方启动、中止、resize等类型的任务,而后执行相应的动作,拿启动来说
  • 增加一个启动任务,而后放入pendingTask队列中
if (state == State.LIVE) {
adjustTasks(curTime);
requestContainers();
}
  • 向RM请求container:创立一个ContainerRequest
ContainerRequest request = containerSpec.makeRequest();
resourceMgr.addContainerRequest(containerSpec.makeRequest());
return request;
  • ResourceCallback监听container分配,而后启动container
private class ResourceCallback implements AMRMClientAsync.CallbackHandler {
@Override
public void onContainersAllocated(List containers) {
controller.containersAllocated(containers);
}
}
public void containerAllocated(EventContext context, Container container) {
Task task = context.task;
LOG.info(task.getLabel() + " - Received container: "
+ DoYUtil.describeContainer(container));
context.group.dequeueAllocatingTask(task);
// No matter what happens below, we don't want to ask for this
// container again. The RM async API is a bit bizarre in this
// regard: it will keep asking for container over and over until
// we tell it to stop.
context.yarn.removeContainerRequest(task.containerRequest);
// The container is need both in the normal and in the cancellation
// path, so set it here.
task.container = container;
if (task.cancelled) {
context.yarn.releaseContainer(container);
taskStartFailed(context, Disposition.CANCELLED);
return;
}
task.error = null;
task.completionStatus = null;
transition(context, LAUNCHING);
// The pool that manages this task wants to know that we have
// a container. The task manager may want to do some task-
// specific setup.
context.group.containerAllocated(context.task);
context.getTaskManager().allocated(context);
// Go ahead and launch a task in the container using the launch
// specification provided by the task group (pool).
try {
context.yarn.launchContainer(container, task.getLaunchSpec());
task.launchTime = System.currentTimeMillis();
} catch (YarnFacadeException e) {
LOG.error("Container launch failed: " + task.getContainerId(), e);
// This may not be the right response. RM may still think
// we have the container if the above is a local failure.
task.error = e;
context.group.containerReleased(task);
task.container = null;
taskStartFailed(context, Disposition.LAUNCH_FAILED);
}
}
  • NodeCallback监听container启动
public class NodeCallback implements NMClientAsync.CallbackHandler {
@Override
public void onStartContainerError(ContainerId containerId, Throwable t) {
controller.taskStartFailed(containerId, t);
}
@Override
public void onContainerStarted(ContainerId containerId, Map allServiceResponse) {
controller.containerStarted(containerId);
}
@Override
public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
}
@Override
public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
}
@Override
public void onStopContainerError(ContainerId containerId, Throwable t) {
controller.stopTaskFailed(containerId, t);
}
@Override
public void onContainerStopped(ContainerId containerId) {
controller.containerStopped(containerId);
}
}

2.3 fail over

Drill-on-YARN除了提供start、stop、resize功能外,还提供了fail over功能,当前某个drillbit挂掉后,Drill-on-YARN会尝试再次启动drillbit,目前重试的次数为2。此外,假如一个drillbit所在的节点频繁挂掉,会被列入黑名单。

我们可以通过手动kill drillbit来模拟drillbit挂掉的情况,而后等待一会儿,可以看到,drillbit进程重新启动了。下面我们看看,代码的执行流程

  1. drillbit挂掉,container结束
private class ResourceCallback implements AMRMClientAsync.CallbackHandler {
@Override
public void onContainersCompleted(List statuses) {
controller.containersCompleted(statuses);
}
}
  1. retry task:重新将这个task加入pendingTasks,而后轮询的线程检测到pendingTasks不为空,执行启动操作
protected void taskTerminated(EventContext context) {
Task task = context.task;
context.getTaskManager().completed(context);
context.group.containerReleased(task);
assert task.completionStatus != null;
// container结束的状态不是0,说明不是正常结束
if (task.completionStatus.getExitStatus() == 0) {
taskEnded(context, Disposition.COMPLETED);
context.group.taskEnded(context.task);
} else {
taskEnded(context, Disposition.RUN_FAILED);
retryTask(context);
}
}
private void retryTask(EventContext context) {
Task task = context.task;
assert task.state == END;
if (!context.controller.isLive() || !task.retryable()) {
context.group.taskEnded(task);
return;
}
if (task.tryCount > task.taskGroup.getMaxRetries()) {
LOG.error(task.getLabel() + " - Too many retries: " + task.tryCount);
task.disposition = Disposition.TOO_MANY_RETRIES;
context.group.taskEnded(task);
return;
}
LOG.info(task.getLabel() + " - Retrying task, try " + task.tryCount);
context.group.taskRetried(task);
task.reset();
transition(context, START);
context.group.enqueuePendingRequest(task);
}

3. 中止

除了前面介绍详情的start命令外,Drill-on-YARN也提供了stop命令,其中stop分两种:

  1. 强制中止:直接调使用yarn用户端的killApplication api yarnClient.killApplication(appId);
  2. 优雅中止:先清除所有的任务,包括pending、running的,而后调使用yarn的api杀死容器,关闭controller,而后通知am运行结束
...
for (Task task : getStartingTasks()) {
context.setTask(task);
context.getState().cancel(context);
}
for (Task task : getActiveTasks()) {
context.setTask(task);
context.getState().cancel(context);
}
...
...
context.yarn.killContainer(task.container);
...
public void run() throws YarnFacadeException {
...
boolean success = controller.waitForCompletion();
...
...
finish(success, null);
...
}
public boolean waitForCompletion() {
start();
synchronized (completionMutex) {
try {
completionMutex.wait();
} catch (InterruptedException e) {

}
}
return succeeded();
}
public void finish(boolean succeeded, String msg) throws YarnFacadeException {
nodeMgr.stop();
String appMsg = "Drill Cluster Shut-Down";
FinalApplicationStatus status = FinalApplicationStatus.SUCCEEDED;
if (!succeeded) {
appMsg = "Drill Cluster Fatal Error - check logs";
status = FinalApplicationStatus.FAILED;
}
if (msg != null) {
appMsg = msg;
}
try {
resourceMgr.unregisterApplicationMaster(status, appMsg, "");
} catch (YarnException | IOException e) {
throw new YarnFacadeException("Deregister AM failed", e);
}
resourceMgr.stop();
}

4. resize

resize流程为:调整quantity(保留多少个container),之后轮询线程会根据quantity,调整任务,执行resize操作

public int resize(int level) {
int limit = quantity + state.getController().getFreeNodeCount() +maxExtraNodes;
return super.resize( Math.min( limit, level ) );
}

5. 总结

总的来说,Drill-on-YARN分为两大板块,drill-on-yarn.sh和drill-am.sh。drill-on-yarn.sh使用于启动ApplicationMaster,drill-am.sh使用于向ResourceManager申请资源并启动Drill集群。其中Drill的启动、中止、缩容、扩容,都被封装为一个任务,在执行这些命令时,会构建一个任务,放入任务队列中。有一个线程会一直轮询此队列,根据队列中的任务执行不同的操作,从而达到启动、中止、缩容、扩容Drill集群的功能。此外,相比独立部署,Drill-on-YARN提供的failover功能强化了Drill的稳固性。

说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » Drill-on-YARN之源码解析

发表回复