返回顶部
首页 > 资讯 > 后端开发 > Python >DolphinScheduler容错Master源码分析
  • 718
分享到

DolphinScheduler容错Master源码分析

DolphinScheduler容错MasterDolphinScheduler Master 2023-02-03 12:02:35 718人浏览 薄情痞子

Python 官方文档:入门教程 => 点击学习

摘要

目录引言容错设计Master容错源码分析Master启动入口Master启动注册信息Master监听和订阅集群状态Master容错流程容错工作流被重新调度总结引言 最近产品上选择使

引言

最近产品上选择使用DolphinScheduler作为大数据的任务调度系统。作为分布式调度系统DolphinScheduler采用了去中心化的多Master和多Worker服务对等架构,可以避免单Master压力过大。在这种架构下,如果一个Master或者Worker挂掉,那么相应的容错处理则必不可少。下面会介绍其具体容错处理的方式以及相关源码的分析。

容错设计

详细的设计结构我们可以参考官方文档。

服务容错设计依赖于ZooKeeper的Watcher机制,Master会监控其他Master和Worker的目录,如果监听到remove事件,则会根据具体的业务逻辑进行流程实例容错或者任务实例容错。

Zookeeper中主要的目录名称如下,这里先简单了解一下,后面源码中会一一使用到:

public static final String REGISTRY_DOLPHINSCHEDULER_MASTERS = "/nodes/master";
public static final String REGISTRY_DOLPHINSCHEDULER_WORKERS = "/nodes/worker";
public static final String REGISTRY_DOLPHINSCHEDULER_NODE = "/nodes";

官方架构图:

Master容错源码分析

Master启动入口

MasterServer的run方法中会有容错的入口:

@PostConstruct
public void run() throws SchedulerException {
	// init rpc server
	this.masterRPCServer.start();
	// install task plugin
	this.taskPluginManager.loadPlugin();
	// self tolerant
	this.masterRegistryClient.start();
	this.masterRegistryClient.setRegistryStoppable(this);
	this.masterSchedulerBootstrap.init();
	this.masterSchedulerBootstrap.start();
	this.eventExecuteService.start();
	this.failoverExecuteThread.start();
	this.schedulerapi.start();
	......
}

Master的启动主要有几个步骤:

基于Netty的rpc服务端启动、任务插件的加载、容错代码的初始化、任务调度的初始化、任务事件处理线程的启动。

这里我们只关心容错相关的代码 masterRegistryClient.start()

public void start() {
	try {
		this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, registryClient);
		// master registry
		registry();
		registryClient.addConnectionStateListener(
				new MasterConnectionStateListener(masterConfig, registryClient, masterConnectStrategy));
		registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
	} catch (Exception e) {
		throw new RegistryException("Master registry client start up error", e);
	}
}

在上面的start方法中主要做了三件事,我们一个个来看。

Master启动注册信息

注册当前master信息到Zookeeper,并且启动了一个心跳任务定时更新master的信息到Zookeeper。


void registry() {
	logger.info("Master node : {} registering to registry center", masterConfig.getMasterAddress());
	String masterRegistryPath = masterConfig.getMasterRegistryPath();
	// remove before persist
	registryClient.remove(masterRegistryPath);
	registryClient.persistEphemeral(masterRegistryPath, JSONUtils.tojsonString(masterHeartBeatTask.getHeartBeat()));
	while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
		logger.warn("The current master server node:{} cannot find in registry", NetUtils.getHost());
		ThreadUtils.sleep(SLEEP_TIME_MILLIS);
	}
	// sleep 1s, waiting master failover remove
	ThreadUtils.sleep(SLEEP_TIME_MILLIS);
	masterHeartBeatTask.start();
	logger.info("Master node : {} registered to registry center successfully", masterConfig.getMasterAddress());
}

master在ZK注册的路径如下:

masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
//nodes/master+ "/" +ip:listenPort
masterConfig.setMasterRegistryPath(REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterConfig.getMasterAddress());

注册的信息主要是Master自身的健康状态如下,并且会定时更新:

@Override
public MasterHeartBeat getHeartBeat() {
	return MasterHeartBeat.builder()
			.startupTime(ServerLifeCycleManager.getServerStartupTime())
			.reportTime(System.currentTimeMillis())
			.cpuUsage(OSUtils.cpuUsage())
			.loadAverage(OSUtils.loadAverage())
			.availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize())
			.maxCpuloadAvg(masterConfig.getMaxCpuLoadAvg())
			.reservedMemory(masterConfig.getReservedMemory())
			.memoryUsage(OSUtils.memoryUsage())
			.diskAvailable(OSUtils.diskAvailable())
			.processId(processId)
			.build();
}

Master监听和订阅集群状态

监听zk客户端与集群连接的状态变化

@Override
public void addConnectionStateListener(ConnectionListener listener) {
	client.getConnectionStateListenable().addListener(new ZookeeperConnectionStateListener(listener));
}

当客户端和服务端因为某些原因重连后会调用MasterConnectionStateListener相关的监听事件处理:

@Override
public void onUpdate(ConnectionState state) {
	logger.info("Master received a {} event from registry, the current server state is {}", state,
			ServerLifeCycleManager.getServerStatus());
	switch (state) {
		case CONNECTED:
			break;
		case SUSPENDED:
			break;
		case RECONNECTED:
			masterConnectStrategy.reconnect();
			break;
		case DISCONNECTED:
			masterConnectStrategy.disconnect();
			break;
		default:
	}
}

订阅Master、Worker注册目录顶级目录/nodes相关的事件,这里主要订阅Remove事件。

registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
@Override
public boolean subscribe(String path, SubscribeListener listener) {
	final TreeCache treeCache = treeCacheMap.computeIfAbsent(path, $ -> new TreeCache(client, path));
	treeCache.getListenable().addListener(($, event) -> listener.notify(new EventAdaptor(event, path)));
	try {
		treeCache.start();
	} catch (Exception e) {
		treeCacheMap.remove(path);
		throw new RegistryException("Failed to subscribe listener for key: " + path, e);
	}
	return true;
}

这里是基于curator客户端中的TreeCache来实现订阅,它允许对ZK中某个路径的数据和路径变更以及其下所有子孙节点的数据和路径变更进行监听。ZK监听数据变化后最终会回调到MasterRegistryDataListener中的notify方法:

public void notify(Event event) {
        //这里的path就是/node目录下发生变化的path信息,可能是/nodes/master
public void failoverServerWhenDown(String serverHost, NodeType nodeType) {
	switch (nodeType) {
		case MASTER:
			LOGGER.info("Master failover starting, masterServer: {}", serverHost);
			masterFailoverService.failoverMaster(serverHost);
			LOGGER.info("Master failover finished, masterServer: {}", serverHost);
			break;
		case WORKER:
			LOGGER.info("Worker failover staring, workerServer: {}", serverHost);
			workerFailoverService.failoverWorker(serverHost);
			LOGGER.info("Worker failover finished, workerServer: {}", serverHost);
			break;
		default:
			break;
	}
}

Master的容错最终会调用masterFailoverService.failoverMaster(serverHost);这里的serverHost就是异常Master节点的ip:port信息。

因为这里可能会有多个Master监听到异常Master节点挂掉的事件,所以会先拿到分布式然后进行容错的操作,拿到锁的Master会开始进行容错:

public void failoverMaster(String masterHost) {
	String failoverPath = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + masterHost;
	try {
		registryClient.getLock(failoverPath);
		doFailoverMaster(masterHost);
	} catch (Exception e) {
		LOGGER.error("Master server failover failed, host:{}", masterHost, e);
	} finally {
		registryClient.releaseLock(failoverPath);
	}
}

整个容错的过程大致如下:

Failover master, will failover process instance and associated task instance. When the process instance belongs to the given masterHost and the restartTime is before the current server start up time,then the process instance will be failovered.

1-首先会会根据异常节点的masterHost去DB中查询出所有需要容错的工作流实例和任务实例。

2-其次会比较工作流实例启动时间和当前masterHost节点启动时间,在服务启动时间之后的则跳过容错。如果当前节点还没有重新启动,那么就需要容错所有的实例。

3-变量工作流实例下所有的任务实例,进行容错处理

4-将工作流实例的Host更新为NULL,并且新增RECOVER_TOLERANCE_FAULT_PROCESS类型的Command到command表中。

private void doFailoverMaster(@NonNull String masterHost) {
	StopWatch failoverTimeCost = StopWatch.createStarted();
	Optional<Date> masterStartupTimeOptional = getServerStartupTime(registryClient.getServerList(NodeType.MASTER),
			masterHost);
        //1-根据 异常节点的masterHost查询所有工作流实例
	List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(
			masterHost);
	if (CollectionUtils.isEmpty(needFailoverProcessInstanceList)) {
		return;
	}
	LOGGER.info(
			"Master[{}] failover starting there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}",
			masterHost,
			needFailoverProcessInstanceList.size(),
			needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));
	for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
		try {
			LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
			LOGGER.info("WorkflowInstance failover starting");
                        //2-校验工作流实例启动时间是否满足容错条件
			if (!checkProcessInstanceNeedFailover(masterStartupTimeOptional, processInstance)) {
				LOGGER.info("WorkflowInstance doesn't need to failover");
				continue;
			}
			// todo: use batch query
			ProcessDefinition processDefinition =
					processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
							processInstance.getProcessDefinitionVersion());
			processInstance.setProcessDefinition(processDefinition);
			int processInstanceId = processInstance.getId();
			List<TaskInstance> taskInstanceList =
					processService.findValidTaskListByProcessId(processInstanceId);
                        //3-对任务实例进行容错,具体在failoverTaskInstance方法中
			for (TaskInstance taskInstance : taskInstanceList) {
				try {
					LoggerUtils.setTaskInstanceIdMDC(taskInstance.getId());
					LOGGER.info("TaskInstance failover starting");
					if (!checkTaskInstanceNeedFailover(taskInstance)) {
						LOGGER.info("The taskInstance doesn't need to failover");
						continue;
					}
					failoverTaskInstance(processInstance, taskInstance);
					LOGGER.info("TaskInstance failover finished");
				} finally {
					LoggerUtils.removeTaskInstanceIdMDC();
				}
			}
                        //4-insert a failover command
			ProcessInstanceMetrics.incProcessInstanceByState("failover");
			// updateProcessInstance host is null to mark this processInstance has been failover
			// and insert a failover command
			processInstance.setHost(Constants.NULL);
			processService.processNeedFailoverProcessInstances(processInstance);
			LOGGER.info("WorkflowInstance failover finished");
		} finally {
			LoggerUtils.removeWorkflowInstanceIdMDC();
		}
	}
	failoverTimeCost.stop();
	LOGGER.info("Master[{}] failover finished, useTime:{}ms",
			masterHost,
			failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
}

构造Command,类型为RECOVER_TOLERANCE_FAULT_PROCESS


@Override
@Transactional
public void processNeedFailoverProcessInstances(ProcessInstance processInstance) {
	// 1 update processInstance host is null
	processInstance.setHost(Constants.NULL);
	processInstanceMapper.updateById(processInstance);
	ProcessDefinition processDefinition = findProcessDefinition(processInstance.getProcessDefinitionCode(),
			processInstance.getProcessDefinitionVersion());
	// 2 insert into recover command
	Command cmd = new Command();
	cmd.setProcessDefinitionCode(processDefinition.getCode());
	cmd.setProcessDefinitionVersion(processDefinition.getVersion());
	cmd.setProcessInstanceId(processInstance.getId());
	cmd.setCommandParam(
			String.fORMat("{\"%s\":%d}", CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
	cmd.setExecutorId(processInstance.getExecutorId());
	cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
	cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority());
	createCommand(cmd);
}

工作流实例中任务实例的处理,设置状态TaskExecutionStatus.NEED_FAULT_TOLERANCE

private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
	TaskMetrics.incTaskInstanceByState("failover");
	boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
	taskInstance.setProcessInstance(processInstance);
	if (!isMasterTask) {
		LOGGER.info("The failover taskInstance is not master task");
		TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
				.buildTaskInstanceRelatedInfo(taskInstance)
				.buildProcessInstanceRelatedInfo(processInstance)
				.buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
				.create();
		if (masterConfig.isKillYarnJobWhenTaskFailover()) {
			// only kill yarn job if exists , the local thread has exited
			LOGGER.info("TaskInstance failover begin kill the task related yarn job");
			ProcessUtils.killYarnJob(loGClient, taskExecutionContext);
		}
		// kill worker task, When the master failover and worker failover happened in the same time,
		// the task may not be failover if we don't set NEED_FAULT_TOLERANCE.
		// This can be improved if we can load all task when cache a workflowInstance in memory
		sendKillCommandToWorker(taskInstance);
	} else {
		LOGGER.info("The failover taskInstance is a master task");
	}
	taskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
	processService.saveTaskInstance(taskInstance);
}

容错工作流被重新调度

前面介绍了在Master启动之后,会启动一个MasterSchedulerBootstrap线程对任务进行调度。在DolphinScheduler中不管是定时任务,还是单次任务,或者是容错的任务,如果到了需要执行的时刻都会生成一个command命令插入到command表中。而MasterSchedulerBootstrap这个线程的作用就是不断从command表中获取需要被执行的command,来进行调度。


@Override
public void run() {
	while (!ServerLifeCycleManager.isStopped()) {
		try {
			if (!ServerLifeCycleManager.isRunning()) {
				// the current server is not at running status, cannot consume command.
				logger.warn("The current server {} is not at running status, cannot consumes commands.", this.masterAddress);
				Thread.sleep(Constants.SLEEP_TIME_MILLIS);
			}
			// todo: if the workflow event queue is much, we need to handle the back pressure
			boolean isOverload =
					OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
			if (isOverload) {
				logger.warn("The current server {} is overload, cannot consumes commands.", this.masterAddress);
				MasterServerMetrics.incMasterOverload();
				Thread.sleep(Constants.SLEEP_TIME_MILLIS);
				continue;
			}
			List<Command> commands = findCommands();
			if (CollectionUtils.isEmpty(commands)) {
				// indicate that no command ,sleep for 1s
				Thread.sleep(Constants.SLEEP_TIME_MILLIS);
				continue;
			}
			List<ProcessInstance> processInstances = command2ProcessInstance(commands);
			if (CollectionUtils.isEmpty(processInstances)) {
				// indicate that the command transform to processInstance error, sleep for 1s
				Thread.sleep(Constants.SLEEP_TIME_MILLIS);
				continue;
			}
			MasterServerMetrics.incMasterConsumeCommand(commands.size());
			processInstances.forEach(processInstance -> {
				try {
					LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
					if (processInstanceExecCacheManager.contains(processInstance.getId())) {
						logger.error(
								"The workflow instance is already been cached, this case shouldn't be happened");
					}
					WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,
							processService,
							processInstanceDao,
							nettyExecutorManager,
							processAlertManager,
							masterConfig,
							stateWheelExecuteThread,
							curingGlobalParamsService);
					processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
					workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
							processInstance.getId()));
				} finally {
					LoggerUtils.removeWorkflowInstanceIdMDC();
				}
			});
		} catch (InterruptedException interruptedException) {
			logger.warn("Master schedule bootstrap interrupted, close the loop", interruptedException);
			Thread.currentThread().interrupt();
			break;
		} catch (Exception e) {
			logger.error("Master schedule workflow error", e);
			// sleep for 1s here to avoid the database down cause the exception boom
			ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
		}
	}
}

上面的代码每次会从DB中批量获取10个(默认)command。然后构造成工作流实例进行遍历处理。

这里需要关注一下command2ProcessInstance方法,会将获取到需要执行的command转为工作流实例。在其内部最终会调用handleCommand方法中的constructProcessInstance方法来构造工作流实例。在其内部会将当前处理此实例的host节点信息设置到实例信息中,并且会对commandType是RECOVER_TOLERANCE_FAULT_PROCESS容错类型的情况进行设置(省略部分代码):

protected @Nullable ProcessInstance constructProcessInstance(Command command,
															 String host) throws CronParseException, CodeGenerateException {
	ProcessInstance processInstance;
	ProcessDefinition processDefinition;
	CommandType commandType = command.getCommandType();
	processDefinition =
			this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
	if (processDefinition == null) {
		logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode());
		throw new IllegalArgumentException("Cannot find the process definition for this workflowInstance");
	}
	Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
	int processInstanceId = command.getProcessInstanceId();
	if (processInstanceId == 0) {
		processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
	} else {
		processInstance = this.findProcessInstanceDetailById(processInstanceId).orElse(null);
		if (processInstance == null) {
			return null;
		}
	}
	if (cmdParam != null) {
		......
	}
	// reset command parameter
	if (processInstance.getCommandParam() != null) {
		Map<String, String> processCmdParam = JSONUtils.toMap(processInstance.getCommandParam());
		processCmdParam.forEach((key, value) -> {
			if (!cmdParam.containsKey(key)) {
				cmdParam.put(key, value);
			}
		});
	}
	// reset command parameter if sub process
	if (cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_SUB_PROCESS)) {
		processInstance.setCommandParam(command.getCommandParam());
	}
	if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) {
		logger.error("command parameter check failed!");
		return null;
	}
	if (command.getScheduleTime() != null) {
		processInstance.setScheduleTime(command.getScheduleTime());
	}
        //设置处理节点的host和restartTime。
	processInstance.setHost(host);
	processInstance.setRestartTime(new Date());
	WorkflowExecutionStatus runStatus = WorkflowExecutionStatus.RUNNING_EXECUTION;
	int runTime = processInstance.getRunTimes();
	switch (commandType) {
		case START_PROCESS:
			break;
		case START_FAILURE_TASK_PROCESS:
			// find failed tasks and init these tasks
			......
			break;
		case START_CURRENT_TASK_PROCESS:
			break;
		case RECOVER_WAITING_THREAD:
			break;
		case RECOVER_SUSPENDED_PROCESS:
			// find pause tasks and init task's state
			......
			break;
                //这里对容错类型的command进行设置Flag.YES
		case RECOVER_TOLERANCE_FAULT_PROCESS:
			// recover tolerance fault process
			processInstance.setRecovery(Flag.YES);
			processInstance.setRunTimes(runTime + 1);
			runStatus = processInstance.getState();
			break;
		case COMPLEMENT_DATA:
			// delete all the valid tasks when complement data if id is not null
			......
			break;
		case REPEAT_RUNNING:
			// delete the recover task names from command parameter
			......
			break;
		case SCHEDULER:
			break;
		default:
			break;
	}
	processInstance.setStateWithDesc(runStatus, commandType.getDescp());
	return processInstance;
}

接着对于每个工作流实例都会加入到一个队列中。

private static final LinkedBlockingQueue<WorkflowEvent> workflowEventQueue = new LinkedBlockingQueue<>();

public void addEvent(WorkflowEvent workflowEvent) {
	workflowEventQueue.add(workflowEvent);
	logger.info("Added workflow event to workflowEvent queue, event: {}", workflowEvent);
}

public WorkflowEvent poolEvent() throws InterruptedException {
	return workflowEventQueue.take();
}

有添加就会有消费,在Master启动之后就已经启动了消费的线程WorkflowEventLooper。

this.masterSchedulerBootstrap.start(); 启动入口:

@Override
public synchronized void start() {
	logger.info("Master schedule bootstrap starting..");
	super.start();
	workflowEventLooper.start();
	logger.info("Master schedule bootstrap started...");
}

具体消费逻辑代码:

public void run() {
	WorkflowEvent workflowEvent = null;
	while (!ServerLifeCycleManager.isStopped()) {
		try {
			workflowEvent = workflowEventQueue.poolEvent();
			LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId());
			logger.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent);
			WorkflowEventHandler workflowEventHandler =
					workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());
			workflowEventHandler.handleWorkflowEvent(workflowEvent);
		} catch (InterruptedException e) {
			logger.warn("WorkflowEventLooper thread is interrupted, will close this loop", e);
			Thread.currentThread().interrupt();
			break;
		} catch (WorkflowEventHandleException workflowEventHandleException) {
			logger.error("Handle workflow event failed, will add this event to event queue again, event: {}",
					workflowEvent, workflowEventHandleException);
			workflowEventQueue.addEvent(workflowEvent);
			ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
		} catch (WorkflowEventHandleError workflowEventHandleError) {
			logger.error("Handle workflow event error, will drop this event, event: {}",
					workflowEvent,
					workflowEventHandleError);
		} catch (Exception unknownException) {
			logger.error(
					"Handle workflow event failed, get a unknown exception, will add this event to event queue again, event: {}",
					workflowEvent, unknownException);
			workflowEventQueue.addEvent(workflowEvent);
			ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
		} finally {
			LoggerUtils.removeWorkflowInstanceIdMDC();
		}
	}
}

可以看到就是从队列中获取添加的event,然后找对应的handler处理。最终会进入到WorkflowStartEventHandler中:

@Override
public void handleWorkflowEvent(final WorkflowEvent workflowEvent) throws WorkflowEventHandleError {
	logger.info("Handle workflow start event, begin to start a workflow, event: {}", workflowEvent);
	WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(
		workflowEvent.getWorkflowInstanceId());
	if (workflowExecuteRunnable == null) {
		throw new WorkflowEventHandleError(
			"The workflow start event is invalid, cannot find the workflow instance from cache");
	}
	ProcessInstanceMetrics.incProcessInstanceByState("submit");
	ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
	CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool)
		.thenAccept(workflowSubmitStatue -> {
			if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) {
				// submit failed will resend the event to workflow event queue
				logger.info("Success submit the workflow instance");
				if (processInstance.getTimeout() > 0) {
					stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
				}
			} else {
				logger.error("Failed to submit the workflow instance, will resend the workflow start event: {}",
							 workflowEvent);
				workflowEventQueue.addEvent(workflowEvent);
			}
		});
}

最终就会对工作流实例中的Task进行提交处理:

构造工作流实例的DAG,初始化队列,提交DAG的头节点

@Override
public WorkflowSubmitStatue call() {
	if (isStart()) {
		// This case should not been happened
		logger.warn("[WorkflowInstance-{}] The workflow has already been started", processInstance.getId());
		return WorkflowSubmitStatue.DUPLICATED_SUBMITTED;
	}
	try {
		LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
		if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) {
			buildFlowDag();
			workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;
			logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
		}
		if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) {
			initTaskQueue();
			workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE;
			logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
		}
		if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) {
			submitPostNode(null);
			workflowRunnableStatus = WorkflowRunnableStatus.STARTED;
			logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
		}
		return WorkflowSubmitStatue.SUCCESS;
	} catch (Exception e) {
		logger.error("Start workflow error", e);
		return WorkflowSubmitStatue.FAILED;
	} finally {
		LoggerUtils.removeWorkflowInstanceIdMDC();
	}
}

那么针对需要容错的任务是在哪里处理的呢? 可以去initTaskQueue方法中瞧一瞧:

首先会调用isNewProcessInstance方法来判断是否是新的工作流实例,具体代码如下

private boolean isNewProcessInstance() {
	if (Flag.YES.equals(processInstance.getRecovery())) {
		logger.info("This workInstance will be recover by this execution");
		return false;
	}
	if (WorkflowExecutionStatus.RUNNING_EXECUTION == processInstance.getState()
			&& processInstance.getRunTimes() == 1) {
		return true;
	}
	logger.info(
			"The workflowInstance has been executed before, this execution is to reRun, processInstance status: {}, runTimes: {}",
			processInstance.getState(),
			processInstance.getRunTimes());
	return false;
}

通过上面可以看到如果工作流实例的recovery熟悉等于Flag.YES,则会返回false。通过前面我们可以知道在处理容错类型command转换为ProcessInstance的时候对其recovery属性设置了Flag.YES。因此返回false,就会执行如下的处理

 if (!isNewProcessInstance()) {
		logger.info("The workflowInstance is not a newly running instance, runtimes: {}, recover flag: {}",
				processInstance.getRunTimes(),
				processInstance.getRecovery());
		List<TaskInstance> validTaskInstanceList =
				processService.findValidTaskListByProcessId(processInstance.getId());
		for (TaskInstance task : validTaskInstanceList) {
			try {
				LoggerUtils.setWorkflowAndTaskInstanceIDMDC(task.getProcessInstanceId(), task.getId());
				logger.info(
						"Check the taskInstance from a exist workflowInstance, existTaskInstanceCode: {}, taskInstanceStatus: {}",
						task.getTaskCode(),
						task.getState());
				if (validTaskMap.containsKey(task.getTaskCode())) {
					logger.warn("Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}",
							task.getTaskCode());
					int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
					TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
					if (!oldTaskInstance.getState().isFinished() && task.getState().isFinished()) {
						task.setFlag(Flag.NO);
						processService.updateTaskInstance(task);
						continue;
					}
				}
				validTaskMap.put(task.getTaskCode(), task.getId());
				taskInstanceMap.put(task.getId(), task);
				if (task.isTaskComplete()) {
					logger.info("TaskInstance is already complete.");
					completeTaskMap.put(task.getTaskCode(), task.getId());
					continue;
				}
				if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()),
						dag)) {
					continue;
				}
				if (task.taskCanRetry()) {
					if (task.getState().isNeedFaultTolerance()) {
						logger.info("TaskInstance needs fault tolerance, will be added to standby list.");
						task.setFlag(Flag.NO);
						processService.updateTaskInstance(task);
						// tolerantTaskInstance add to standby list directly
						TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task);
						addTaskToStandByList(tolerantTaskInstance);
					} else {
						logger.info("Retry taskInstance, taskState: {}", task.getState());
						retryTaskInstance(task);
					}
					continue;
				}
				if (task.getState().isFailure()) {
					errorTaskMap.put(task.getTaskCode(), task.getId());
				}
			} finally {
				LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
			}
		}
	} else {
		logger.info("The current workflowInstance is a newly running workflowInstance");
	}

上面就会对工作流实例下的所有任务实例进行处理,对已完成的任务加入到completeTaskMap中。并且通过task.getState().isNeedFaultTolerance()来判断是否是需要容错的任务还是重试任务。容错任务会加入到队列readyToSubmitTaskQueue中(tolerantTaskInstance add to standby list directly)。

最后通过submitPostNode方法来触发工作流实例中任务实例的执行。可以发现和普通任务没有什么区别。

总结

对于Master的容错流程大致分为三个方向:

1-获取容错范围:全程会加锁获取,根据异常节点的host去获取哪些工作流实例需要加速

2-容错处理:包括容错工作流实例和任务实例,在容错前会比较实例的开始时间和服务节点的启动时间,在服务启动时间之后的则跳过容错;最终生成command命令到表中。

3-容错任务的调度:调度线程遍历到command命令之后会重新构造为工作流实例,并根据容错的类型初始化任务实例到对应的队列中,然后对其任务实例重新进行调度。

可以参考官方文档

以上就是DolphinScheduler容错Master源码分析的详细内容,更多关于DolphinScheduler容错Master的资料请关注编程网其它相关文章!

--结束END--

本文标题: DolphinScheduler容错Master源码分析

本文链接: https://www.lsjlt.com/news/194077.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • DolphinScheduler容错Master源码分析
    目录引言容错设计Master容错源码分析Master启动入口Master启动注册信息Master监听和订阅集群状态Master容错流程容错工作流被重新调度总结引言 最近产品上选择使...
    99+
    2023-02-03
    DolphinScheduler容错Master DolphinScheduler Master
  • DolphinScheduler容错源码分析之Worker
    目录引言Worker容错源码分析worker启动注册Master监听worker在zk节点的状态处理容错event事件总结引言 上一篇文章介绍了DolphinScheduler中M...
    99+
    2023-02-06
    DolphinScheduler容错Worker DolphinScheduler Worker
  • RocketMQ producer容错机制源码分析
    这篇文章主要介绍“RocketMQ producer容错机制源码分析”,在日常操作中,相信很多人在RocketMQ producer容错机制源码分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家...
    99+
    2023-07-05
  • RocketMQ producer容错机制源码解析
    目录1. 前言2. 失败重试3. 延迟故障3.1 最普通的选择策略3.2 延迟故障的实现1. 前言 本文主要是介绍一下RocketMQ消息生产者在发送消息的时候发送失败的问题处理?...
    99+
    2023-03-19
    RocketMQ producer容错机制 RocketMQ producer
  • 不容错过的HashMap实现原理及源码分析
    哈希表(hash table)也叫散列表,是一种非常重要的数据结构,应用场景及其丰富,许多缓存技术(比如memcached)的核心其实就是在内存中维护一张大的哈希表,而HashMap的实现原理也常常出现在各类的面试题中,重要性可见一斑。本文...
    99+
    2023-06-02
  • golang错误捕获源码分析
    这篇文章主要介绍“golang错误捕获源码分析”,在日常操作中,相信很多人在golang错误捕获源码分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”golang错误捕获源码分析”的疑惑有所帮助!接下来,请跟...
    99+
    2023-07-06
  • Spring源码分析容器启动流程
    目录前言源码解析1、初始化流程流程分析核心代码剖析2、刷新流程流程分析核心代码剖析前言 本文基于 Spring 的 5.1.6.RELEASE 版本 Spring的启动流程可以归纳为...
    99+
    2024-04-02
  • DataV全屏容器组件源码分析
    这篇文章主要介绍“DataV全屏容器组件源码分析”,在日常操作中,相信很多人在DataV全屏容器组件源码分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”DataV全屏容器组件源码分析”的疑惑有所帮助!接下来...
    99+
    2023-07-05
  • RateLimiter 源码分析
    俗话说得好,缓存,限流和降级是系统的三把利剑。刚好项目中每天早上导出数据时因调订单接口频率过高,订单系统担心会对用户侧的使用造成影响,让我们对调用限速一下,所以就正好用上了。 常用的限流算法有2种:漏桶算法和令牌桶算法。漏桶算法漏...
    99+
    2023-05-31
    ratelimiter 源码 mi
  • SocketServer 源码分析
    Creating network servers. contents SocketServer.py contents file head BaseServer BaseServer.serve_forever BaseServ...
    99+
    2023-01-31
    源码 SocketServer
  • CesiumJS源码分析
    这篇文章主要介绍“CesiumJS源码分析”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“CesiumJS源码分析”文章能帮助大家解决问题。1. 有什么光CesiumJS 支持的光的类型比较少,默认场...
    99+
    2023-07-06
  • 如何进行HashMap扩容机制源码分析
    这期内容当中小编将会给大家带来有关如何进行HashMap扩容机制源码分析,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。具体看源码之前,我们先简单的说一下HashMap的底层数据结构  1、HashMap底...
    99+
    2023-06-02
  • 如何理解Java 容器中并发容器的源码分析
    这期内容当中小编将会给大家带来有关如何理解Java 容器中并发容器的源码分析,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。如果没有特别说明,以下源码分析基于 JDK 1.8。CopyOnWriteArra...
    99+
    2023-06-05
  • Kafka源码分析(一)
    Apache Kafka® 是 一个分布式流处理平台. 这到底意味着什么呢 我们知道流处理平台有以下三种特性: 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。 可以储存流式的记录,并且有较好的容错性。 可...
    99+
    2019-10-17
    Kafka源码分析(一)
  • Android LayoutInflater.inflate源码分析
    LayoutInflater.inflate源码详解 LayoutInflater的inflate方法相信大家都不陌生,在Fragment的onCreateView中或者在Ba...
    99+
    2022-06-06
    layoutinflater Android
  • Android AsyncTask源码分析
    Android中只能在主线程中进行UI操作,如果是其它子线程,需要借助异步消息处理机制Handler。除此之外,还有个非常方便的AsyncTask类,这个类内部封装了Handl...
    99+
    2022-06-06
    asynctask Android
  • Nebula Graph源码分析
    本篇内容介绍了“Nebula Graph源码分析”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!导读对于一些...
    99+
    2024-04-02
  • 分析Android Choreographer源码
    目录一、前言二、主线程运行机制的本质三、Choreographer 简介3.1、Choreographer 的工作流程四、Choreographer 源码分析4.1、Choreogr...
    99+
    2024-04-02
  • Spring cache源码分析
    今天小编给大家分享一下Spring cache源码分析的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。题外话:如何阅...
    99+
    2023-06-29
  • Java ConcurrentHashMap源码分析
    这篇“Java ConcurrentHashMap源码分析”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Java&...
    99+
    2023-07-05
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作