WEB开发网      濠电姷鏁告繛鈧繛浣冲洤纾瑰┑鐘宠壘閻ょ偓銇勯幇鍫曟闁稿鍠愰妵鍕冀閵娧佲偓鎺楁⒒閸曨偄顏柡宀嬬畱铻e〒姘煎灡绗戦梻浣筋嚙濮橈箓顢氳濠€浣糕攽閻樿宸ュΔ鐘叉啞缁傚秹宕滆绾惧ジ寮堕崼娑樺缂佹宀搁弻鐔风暋閻楀牆娈楅梺璇″枓閺呯姴鐣疯ぐ鎺濇晝闁靛牆妫欓蹇旂節閻㈤潧浠﹂柛銊ョ埣楠炴劙骞橀鑲╋紱闂佽宕樼粔顔裤亹閹烘挸浜归梺缁樺灦閿曗晛螞閸曨垱鈷戦柟鑲╁仜婵″ジ鎮楀☉鎺撴珖缂侇喖顑呴鍏煎緞濡粯娅囬梻浣瑰缁诲倿寮绘繝鍥ㄦ櫇闁稿本绋撻崢鐢告煟鎼淬垻鈯曢柨姘舵煟韫囥儳绋荤紒缁樼箖缁绘繈宕橀妸褌绱濋梻浣筋嚃閸ㄤ即宕弶鎴犳殾闁绘梻鈷堥弫鍌炴煕閳锯偓閺呮瑧妲愬Ο琛℃斀闁绘劕妯婇崵鐔封攽椤旇棄鍔ら摶鐐烘煕閺囥劌澧柛娆忕箻閺屽秹宕崟顒€娅g紓浣插亾濠㈣泛顑囩粻楣冩煙鐎涙ḿ绠橀柨娑樼У椤ㄣ儵鎮欓鍕紙闂佽鍠栫紞濠傜暦閹偊妲诲┑鈩冨絻椤兘寮诲☉銏犖╅柕澶堝労閸斿绱撴担绋库偓鍝ョ矓瑜版帒鏋侀柟鍓х帛閺呮悂鏌ㄩ悤鍌涘 ---闂傚倸鍊烽悞锔锯偓绗涘厾娲煛閸涱厾顔嗛梺璺ㄥ櫐閹凤拷
开发学院服务器云计算 Hadoop Map/Reduce执行全流程关键代码 阅读

Hadoop Map/Reduce执行全流程关键代码

 2012-09-18 13:36:43 来源:WEB开发网 闂傚倸鍊风欢姘缚瑜嶈灋闁圭虎鍠栫粻顖炴煥閻曞倹瀚�闂傚倸鍊风粈渚€骞夐敓鐘插瀭闁汇垹鐏氬畷鏌ユ煙閹殿喖顣奸柛搴$У閵囧嫰骞掗幋婵冨亾閻㈢ǹ纾婚柟鐐灱濡插牊绻涢崱妤冃℃繛宀婁簽缁辨捇宕掑鎵佹瀸闂佺懓鍤栭幏锟�濠电姷鏁告慨顓㈠箯閸愵喖宸濇い鎾寸箘閹规洟姊绘笟鈧ḿ褍煤閵堝悿娲Ω閳轰胶鍔﹀銈嗗笂閼冲爼鍩婇弴銏$厪闁搞儮鏅涙禒褏绱掓潏鈺佷槐闁轰焦鎹囬弫鎾绘晸閿燂拷闂傚倸鍊风欢姘缚瑜嶈灋闁圭虎鍠栫粻顖炴煥閻曞倹瀚�  闂傚倸鍊烽懗鑸电仚缂備胶绮〃鍛村煝瀹ュ鍗抽柕蹇曞У閻庮剟姊虹紒妯哄闁圭⒈鍋嗛惀顏囶樄闁哄本娲樼换婵婄疀閺囩姷鐛ラ梻浣哄帶婢瑰﹥绂嶅⿰鍫氣偓鏃堝礃椤忎礁浜鹃柨婵嗛婢ь喖霉閻樻瑥瀚粻楣冩煕椤愩倕鏋庨柣蹇嬪劜閵囧嫰寮村Ο鍝勫Е濡炪們鍨洪悷鈺呭箖閳╁啯鍎熼柕鍥у簻閹凤拷
核心提示:Hadoop Map/Reduce 执行流程关键代码JobClient.runJob(conf) | 运行job|-->JobClient jc = new JobClient(job);|-->RunningJob rj = jc.submitJob(job);|-->submitJobIntern
Hadoop Map/Reduce 执行流程关键代码

JobClient.runJob(conf) | 运行job
|-->JobClient jc = new JobClient(job);
|-->RunningJob rj = jc.submitJob(job);
	|-->submitJobInternal(job);
		|-->int reduces = job.getNumReduceTasks();
		|-->JobContext context = new JobContext(job, jobId);
		|-->maps = writeOldSplits(job, submitSplitFile);
		|-->job.setNumMapTasks(maps);
		|-->job.writeXml(out);
		|-->JobStatus status = jobSubmitClient.submitJob(jobId);

JobTracker.submitJob(JobId) |提交job
|-->JobInProgress job = new JobInProgress(jobId, this, this.conf);
|-->checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);  |检查权限
|-->checkMemoryRequirements(job);  |检查内存需求
|-->addJob(jobId, job);  |添加至job队列
	|-->jobs.put(job.getProfile().getJobID(), job);
	|--> for (JobInProgressListener listener : jobInProgressListeners) |添加至监听器,供调度使用
		|-->listener.jobAdded(job);

JobTracker.heartbeat()  |JobTracker启动后供TaskTracker以RPC方式来调用,返回Response集合
|-->List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
|-->tasks = taskScheduler.assignTasks(taskTrackerStatus);  |通过调度器选择合适的tasks
|-->for (Task task : tasks)
	|-->expireLaunchingTasks.addNewTask(task.getTaskID());
	|-->actions.add(new LaunchTaskAction(task));  |实际actions还会添加commmitTask等
|-->response.setHeartbeatInterval(nextInterval);
|-->response.setActions(actions.toArray(new TaskTrackerAction[actions.size()]));
|-->return response;


TaskTracker.offerService |TaskTracker启动后通过offerservice()不断发心跳至JobTracker中
|-->transmitHeartBeat()
	|-->HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, justInited,askForNewTask, heartbeatResponseId);
|-->TaskTrackerAction[] actions = heartbeatResponse.getActions();
|-->for(TaskTrackerAction action: actions)
	|-->if (action instanceof LaunchTaskAction)
		|-->addToTaskQueue((LaunchTaskAction)action);  |添加至执行Queue,根据map/reduce task分别添加
			|-->if (action.getTask().isMapTask()) {
				|-->mapLauncher.addToTaskQueue(action);
					|-->TaskInProgress tip = registerTask(action, this);
					|-->tasksToLaunch.add(tip);
					|-->tasksToLaunch.notifyAll();  |唤醒阻塞进程
			|-->else 
				|-->reduceLauncher.addToTaskQueue(action);

TaskLauncher.run()
|--> while (tasksToLaunch.isEmpty()) 
             |-->tasksToLaunch.wait();
|-->tip = tasksToLaunch.remove(0);
|-->startNewTask(tip);
	|-->localizeJob(tip);
		|-->launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
			|-->tip.setJobConf(jobConf);
			|-->tip.launchTask();  |TaskInProgress.launchTask()
				|-->this.runner = task.createRunner(TaskTracker.this, this); |区分map/reduce
				|-->this.runner.start();
MapTaskRunner.run()  |执行MapTask
|-->File workDir = new File(lDirAlloc.getLocalPathToRead()  |准备执行路径
|-->String jar = conf.getJar();  |准备jar包
|-->File jvm = new File(new File(System.getProperty("java.home"), "bin"), "java");  |获取jvm
|-->vargs.add(Child.class.getName());  |添加参数,Child类作为main主函数启动
|-->tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf, pidFile);  |添加至内存管理
|-->jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,  |统一纳入jvm管理器当中并启动
				workDir, env, pidFile, conf));
		|-->mapJvmManager.reapJvm(t, env);  |区分map/reduce操作

JvmManager.reapJvm()  |
|--> while (jvmIter.hasNext())
	|-->JvmRunner jvmRunner = jvmIter.next().getValue();
	|-->JobID jId = jvmRunner.jvmId.getJobId();
	|-->setRunningTaskForJvm(jvmRunner.jvmId, t);
|-->spawnNewJvm(jobId, env, t);
	|-->JvmRunner jvmRunner = new JvmRunner(env,jobId);
        |-->jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
	|-->jvmRunner.start();   |执行JvmRunner的run()方法
		|-->jvmRunner.run()
			|-->runChild(env);
				|-->List<String> wrappedCommand =  TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
						 env.logSize, env.pidFile);  |选取main函数
				|-->shexec.execute();  |执行
				|-->int exitCode = shexec.getExitCode(); |获取执行状态值
				|--> updateOnJvmExit(jvmId, exitCode, killed); |更新Jvm状态

Child.main() 执行Task(map/reduce)
|-->JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);
|-->TaskUmbilicalProtocol umbilical = (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
		TaskUmbilicalProtocol.versionID, address, defaultConf);
|--> while (true) 
	|-->JvmTask myTask = umbilical.getTask(jvmId);
	|-->task = myTask.getTask();
	|-->taskid = task.getTaskID();
	|-->TaskRunner.setupWorkDir(job);
	|-->task.run(job, umbilical);   |以maptask为例
		|-->TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
		|-->if (useNewApi)
			|-->runNewMapper(job, split, umbilical, reporter);
		|-->else
			|-->runOldMapper(job, split, umbilical, reporter);
				|-->inputSplit = (InputSplit) ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
				|-->MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =  ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
				|-->runner.run(in, new OldOutputCollector(collector, conf), reporter);

MapRunner.run()
|--> K1 key = input.createKey();
|-->V1 value = input.createValue();
|-->while (input.next(key, value)) 
	|-->mapper.map(key, value, output, reporter);
	|--> if(incrProcCount) 
		|-->reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
                |-->SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
|-->mapper.close();

    

Tags:Hadoop Map Reduce

编辑录入:爽爽 [复制链接] [打 印]
赞助商链接