From 111604c5c31447a040bf849aecc7612ee6c4e55a Mon Sep 17 00:00:00 2001 From: Dennis Fetterly Date: Fri, 26 Jul 2013 10:37:15 -0700 Subject: [PATCH] Fix shutdown bug. --- CommonCode/SchedulerHelper.cs | 5 ++- DryadYarnBridge/YarnAppMasterManaged.cpp | 4 +-- DryadYarnBridge/YarnAppMasterManaged.h | 2 +- DryadYarnBridge/YarnAppMasterNative.cpp | 22 +++++++++---- DryadYarnBridge/YarnAppMasterNative.h | 2 +- GraphManager/filesystem/DrHdfsClient.cpp | 10 +++--- Java/DryadAppMaster.java | 41 +++++++++++++++--------- 7 files changed, 53 insertions(+), 33 deletions(-) diff --git a/CommonCode/SchedulerHelper.cs b/CommonCode/SchedulerHelper.cs index c17a87d..4882099 100644 --- a/CommonCode/SchedulerHelper.cs +++ b/CommonCode/SchedulerHelper.cs @@ -459,7 +459,7 @@ namespace Microsoft.Research.Dryad void ISchedulerHelper.FinishJob() { - m_appMaster.Finish(); + } string ISchedulerHelper.GetVertexServiceBaseAddress(string nodename, int instanceId) @@ -535,6 +535,8 @@ namespace Microsoft.Research.Dryad } } + m_appMaster.Finish(true); + if (wait) { try @@ -619,6 +621,7 @@ namespace Microsoft.Research.Dryad DryadLogger.LogInformation("QueueYarnUpdate", "Task {0} on node {2} is in state {3}", taskId, nodeName, taskState); // Set change event arguments + YarnTaskState yTaskState = (YarnTaskState)taskState; VertexTask v = new VertexTask(taskId, nodeName, yTaskState, int.MaxValue, DateTime.UtcNow); m_taskUpdateQueue.Add(v); diff --git a/DryadYarnBridge/YarnAppMasterManaged.cpp b/DryadYarnBridge/YarnAppMasterManaged.cpp index f76c2b9..c764d52 100644 --- a/DryadYarnBridge/YarnAppMasterManaged.cpp +++ b/DryadYarnBridge/YarnAppMasterManaged.cpp @@ -62,12 +62,12 @@ namespace Microsoft { namespace Research { namespace Dryad { namespace YarnBridg } } - void AMInstance::Finish() + void AMInstance::Finish(bool success) { if (m_instance != IntPtr::Zero) { AMNativeInstance *instance = (AMNativeInstance *) m_instance.ToPointer(); - instance->Shutdown(); + instance->Shutdown(success); } } diff --git a/DryadYarnBridge/YarnAppMasterManaged.h b/DryadYarnBridge/YarnAppMasterManaged.h index 0f2a004..f75576f 100644 --- a/DryadYarnBridge/YarnAppMasterManaged.h +++ b/DryadYarnBridge/YarnAppMasterManaged.h @@ -37,7 +37,7 @@ namespace Microsoft { namespace Research { namespace Dryad { namespace YarnBridg void Close(); - void Finish(); + void Finish(bool success); int GetHealthyNodeCount(); diff --git a/DryadYarnBridge/YarnAppMasterNative.cpp b/DryadYarnBridge/YarnAppMasterNative.cpp index 02c7403..5d1adc7 100644 --- a/DryadYarnBridge/YarnAppMasterNative.cpp +++ b/DryadYarnBridge/YarnAppMasterNative.cpp @@ -246,8 +246,8 @@ namespace DryadYarn return false; } - jmethodID midShutdown = m_env->e->GetMethodID(clsDryadAppMaster, "shutdown", "()V"); - if (midSchProc == NULL) + jmethodID midShutdown = m_env->e->GetMethodID(clsDryadAppMaster, "shutdown", "(ZZ)V"); + if (midShutdown == NULL) { jthrowable exc; exc = m_env->e->ExceptionOccurred(); @@ -288,7 +288,6 @@ namespace DryadYarn jstring jName = env->NewStringUTF(name); jstring jCmdLine = env->NewStringUTF(commandLine); - env->CallVoidMethod(m_inst->m_obj, m_inst->m_midSchProc, vertexId, jName, jCmdLine); env->DeleteLocalRef(jName); @@ -299,16 +298,25 @@ namespace DryadYarn return true; } - bool AMNativeInstance::Shutdown() + bool AMNativeInstance::Shutdown(bool success) { fprintf(stderr, "Shutting down AMNativeInstance\n"); fflush(stderr); JNIEnv* env = AttachToJvm(); - - env->CallVoidMethod(m_inst->m_obj, m_inst->m_midShutdown); + fprintf(stderr, "Calling Shutdown\n"); + fflush(stderr); + + jboolean jImmedShutdown = 0; + jboolean jSuccess = 0; + if (success) + { + jSuccess = 1; + } + env->CallVoidMethod(m_inst->m_obj, m_inst->m_midShutdown, jImmedShutdown, jSuccess); // detach here? - + fprintf(stderr, "Finished Shutdown\n"); + fflush(stderr); return true; } diff --git a/DryadYarnBridge/YarnAppMasterNative.h b/DryadYarnBridge/YarnAppMasterNative.h index 6e90864..63cbef6 100644 --- a/DryadYarnBridge/YarnAppMasterNative.h +++ b/DryadYarnBridge/YarnAppMasterNative.h @@ -70,7 +70,7 @@ namespace DryadYarn char* GetExceptionMessage(); bool ScheduleProcess(int vertexId, const char* name, const char* commandLine); - bool Shutdown(); + bool Shutdown(bool success); private: //void* operator new( size_t ); diff --git a/GraphManager/filesystem/DrHdfsClient.cpp b/GraphManager/filesystem/DrHdfsClient.cpp index f921b13..e107715 100644 --- a/GraphManager/filesystem/DrHdfsClient.cpp +++ b/GraphManager/filesystem/DrHdfsClient.cpp @@ -339,11 +339,11 @@ DrString DrHdfsOutputStream::GetURIForWrite(int partitionIndex, DrMetaDataRef /*metaData */) { DrAssert(m_hdfsInstance != DrNull); - DrString fileName; - fileName.Set(m_baseUri); - //String^ fileName = m_baseUri + "-tmp/part-" + partitionIndex.ToString("D8") + "." + version; - fileName.AppendF("-tmp/part-%8d.%d", partitionIndex, version); - return fileName; + String^ fileName = m_baseUri + "-tmp/part-" + partitionIndex.ToString("D8") + "." + version; + + //DrLogI("HDFS GetURIForWrite returning '%s'", fileName); // DCF HDFS debug + + return DrString(fileName); } void DrHdfsOutputStream::DiscardUnusedPartition(int partitionIndex, diff --git a/Java/DryadAppMaster.java b/Java/DryadAppMaster.java index 758d3aa..f05837a 100644 --- a/Java/DryadAppMaster.java +++ b/Java/DryadAppMaster.java @@ -134,7 +134,7 @@ public class DryadAppMaster { log = LogFactory.getLog("DryadAppMaster"); log.info("In DryadAppMaster constructor"); - shuttingDown = new AtomicBoolean(); + shuttingDown = new AtomicBoolean(false); scheduleProcesses = new AtomicBoolean(true); responseId = new AtomicInteger(); nextVertexId = new AtomicInteger(2); //first vertex id is 2 to map to Dryad Vertex Scheduler @@ -233,19 +233,20 @@ public class DryadAppMaster { // check to see if we should cancel the heartbeat if (shuttingDown.get()) { + log.info("Cancelling heartbeat"); heartbeatHandle.cancel(true); - } - log.info("Sending heartbeat to the RM"); - AllocateResponse response = sendAllocateRequest(); - if (response != null) { - int oldNodeCount = clusterNodeCount; - clusterNodeCount = response.getNumClusterNodes(); - if (clusterNodeCount != oldNodeCount) { - log.info("There are now " + clusterNodeCount + " available nodes on the cluster."); + } else { + log.info("Sending heartbeat to the RM"); + AllocateResponse response = sendAllocateRequest(); + if (response != null) { + int oldNodeCount = clusterNodeCount; + clusterNodeCount = response.getNumClusterNodes(); + if (clusterNodeCount != oldNodeCount) { + log.info("There are now " + clusterNodeCount + " available nodes on the cluster."); + } + processResponse(response); } - processResponse(response); } - } private void launchContainer(Container container, ContainerManager cm) @@ -317,7 +318,9 @@ public class DryadAppMaster { // is this the first allocation? if (scheduleProcesses.compareAndSet(true, false)) { - int numProcessesToStart = Math.max(response.getNumClusterNodes() - 1, maxNodes); //don't schedule a process where the graph manager is running + //don't schedule a process where the graph manager is running + int numProcessesToStart = Math.max(response.getNumClusterNodes() - 1, maxNodes); + log.info("There are " + response.getNumClusterNodes() + " nodes in the cluster. maxNodes = " + maxNodes); scheduleProcess(numProcessesToStart); } @@ -339,7 +342,8 @@ public class DryadAppMaster // Need to notify graph manager of current state VertexInfo vi = runningContainers.remove(cid); - if (vi != null) { + //only send events up the stack when we are not shutting down + if (vi != null && !shuttingDown.get()) { int containerState = 0; if (containerStatus.getState() == ContainerState.COMPLETE) { if (containerStatus.getExitStatus() == 0) { @@ -426,8 +430,8 @@ public class DryadAppMaster } } - public void shutdown(boolean immediateShutdown) - { + public void shutdown(boolean immediateShutdown, boolean success) + { shuttingDown.set(true); heartbeatHandle.cancel(immediateShutdown); // if we are shutting down, we can just interrupt the running thread, if necessary log.info("Shutdown heartbeats to RM"); @@ -435,13 +439,18 @@ public class DryadAppMaster // send the shutdown message to the RM FinishApplicationMasterRequest request = Records.newRecord(FinishApplicationMasterRequest.class); request.setAppAttemptId(appAttemptID); - request.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED); // NYI - determine success + if (success) { + request.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED); + } else { + request.setFinishApplicationStatus(FinalApplicationStatus.FAILED); + } try { //response is currently an empty class FinishApplicationMasterResponse response = resourceManager. finishApplicationMaster(request); } catch (YarnRemoteException|IOException e) { log.error("Error communicating with RM: " + e.getMessage() , e); } + log.info("FinishApplicationMasterRequest sent"); } private void startContainers(List newContainers)