diff --git a/JobBrowser/JOM/JobObjectModel.csproj b/JobBrowser/JOM/JobObjectModel.csproj
index cabcc93..626d232 100644
--- a/JobBrowser/JOM/JobObjectModel.csproj
+++ b/JobBrowser/JOM/JobObjectModel.csproj
@@ -77,7 +77,7 @@
False..\packages\Microsoft.Research.Peloponnese.0.7.2-beta\lib\net45\Microsoft.Research.Peloponnese.HadoopBridge.dll
-
+ False..\packages\Microsoft.Research.Peloponnese.0.7.2-beta\lib\net45\Microsoft.Research.Peloponnese.Utils.dll
diff --git a/JobBrowser/JobBrowser/JobBrowser.csproj b/JobBrowser/JobBrowser/JobBrowser.csproj
index fe6aed5..e8d8b64 100644
--- a/JobBrowser/JobBrowser/JobBrowser.csproj
+++ b/JobBrowser/JobBrowser/JobBrowser.csproj
@@ -115,10 +115,12 @@
False..\packages\Microsoft.Research.Peloponnese.0.7.2-beta\lib\net45\Microsoft.Research.Peloponnese.HadoopBridge.dll
+ TrueFalse..\packages\Microsoft.Research.Peloponnese.0.7.2-beta\lib\net45\Microsoft.Research.Peloponnese.Utils.dll
+ True..\packages\Microsoft.Bcl.Async.1.0.166\lib\net40\Microsoft.Threading.Tasks.dll
diff --git a/LinqToDryad/LocalJobSubmission.cs b/LinqToDryad/LocalJobSubmission.cs
index e3c9199..f1e7fb5 100644
--- a/LinqToDryad/LocalJobSubmission.cs
+++ b/LinqToDryad/LocalJobSubmission.cs
@@ -284,7 +284,7 @@ namespace Microsoft.Research.DryadLinq
{
if (Context.PeloponneseHomeDirectory == null)
{
- throw new ApplicationException("No Peloponnese home directory is set");
+ Context.PeloponneseHomeDirectory = Context.DryadHomeDirectory;
}
if (Context.DryadHomeDirectory == null)
{
diff --git a/LinqToDryad/YarnJobSubmission.cs b/LinqToDryad/YarnJobSubmission.cs
index bcd8891..92a5d69 100644
--- a/LinqToDryad/YarnJobSubmission.cs
+++ b/LinqToDryad/YarnJobSubmission.cs
@@ -267,7 +267,7 @@ namespace Microsoft.Research.DryadLinq
{
if (Context.PeloponneseHomeDirectory == null)
{
- throw new ApplicationException("No Peloponnese home directory is set");
+ Context.PeloponneseHomeDirectory = Context.DryadHomeDirectory;
}
if (Context.DryadHomeDirectory == null)
{
diff --git a/LocalScheduler/Computer.cs b/LocalScheduler/Computer.cs
index 38023c7..0c44934 100644
--- a/LocalScheduler/Computer.cs
+++ b/LocalScheduler/Computer.cs
@@ -94,6 +94,14 @@ namespace Microsoft.Research.Dryad.LocalScheduler
///
private TaskCompletionSource finishWaiter;
+ ///
+ /// children that will be cancelled when finishWaiter is unblocked. These are materialized into a
+ /// separate set so that they can be discarded when they are no longer needed. Otherwise, when
+ /// used in WhenAny internally, they lead to the GC holding onto any other tasks in the WhenAny
+ /// clause until finishWaiter completes, which is a huge memory leak
+ ///
+ private HashSet> childFinishWaiters;
+
///
/// this blocks until the command loop exits
///
@@ -140,6 +148,8 @@ namespace Microsoft.Research.Dryad.LocalScheduler
// make the Task that CommandLoop blocks on; when finishWaiter is started it returns null
// causing CommandLoop to exit.
finishWaiter = new TaskCompletionSource();
+ childFinishWaiters = new HashSet>();
+ finishWaiter.Task.ContinueWith((t) => Task.Run(() => SetChildFinishWaiters()));
// this is started when the Command Loop exits
exited = new TaskCompletionSource();
@@ -201,9 +211,54 @@ namespace Microsoft.Research.Dryad.LocalScheduler
}
///
- /// a task that can be awaited and will asynchronously unblock when the finishWaiter result is set
+ /// set all the pending cancellations from the master finishWaiter
///
- private Task AsyncFinishWaiter { get { return finishWaiter.Task.ContinueWith((t) => t.Result); } }
+ private void SetChildFinishWaiters()
+ {
+ lock (this)
+ {
+ foreach (TaskCompletionSource waiter in childFinishWaiters)
+ {
+ waiter.SetResult(finishWaiter.Task.Result);
+ }
+ childFinishWaiters = null;
+ }
+ }
+
+ ///
+ /// get a task that can be awaited and will asynchronously unblock when the finishWaiter result is set
+ ///
+ private TaskCompletionSource GetAsyncFinishWaiter()
+ {
+ TaskCompletionSource thisCompletion = new TaskCompletionSource();
+ lock (this)
+ {
+ if (childFinishWaiters == null)
+ {
+ thisCompletion.SetResult(finishWaiter.Task.Result);
+ }
+ else
+ {
+ childFinishWaiters.Add(thisCompletion);
+ }
+ }
+ return thisCompletion;
+ }
+
+ ///
+ /// take the finish waiter out of the list of pending waiters, since its target has completed
+ ///
+ /// waiter to remove
+ private void RemoveAsyncFinishWaiter(TaskCompletionSource waiter)
+ {
+ lock (this)
+ {
+ if (childFinishWaiters != null)
+ {
+ childFinishWaiters.Remove(waiter);
+ }
+ }
+ }
///
/// (asynchronously) block until there is a process available on the local queue, the rack queue
@@ -248,7 +303,9 @@ namespace Microsoft.Research.Dryad.LocalScheduler
// we want to wait either for waiter to be matched with a Process in one of the three queues, or
// for ShutDown to be called, so make an array of tasks and wait for the first one to be unblocked.
- var unblocked = await Task.WhenAny(blocker, AsyncFinishWaiter);
+ TaskCompletionSource thisWaiter = GetAsyncFinishWaiter();
+ var unblocked = await Task.WhenAny(blocker, thisWaiter.Task);
+ RemoveAsyncFinishWaiter(thisWaiter);
if (unblocked.Result != null)
{
@@ -341,7 +398,9 @@ namespace Microsoft.Research.Dryad.LocalScheduler
{
logger.Log("Computer " + name + " reporting match with process " + process.Id);
- await process.OnScheduled(this, nextTask, AsyncFinishWaiter, null);
+ TaskCompletionSource thisWaiter = GetAsyncFinishWaiter();
+ await process.OnScheduled(this, nextTask, thisWaiter.Task, null);
+ RemoveAsyncFinishWaiter(thisWaiter);
logger.Log("Computer " + name + " waiting for process " + process.Id + " to complete");
diff --git a/LocalScheduler/Process.cs b/LocalScheduler/Process.cs
index cfa8afa..12ce99e 100644
--- a/LocalScheduler/Process.cs
+++ b/LocalScheduler/Process.cs
@@ -137,7 +137,7 @@ namespace Microsoft.Research.Dryad.LocalScheduler
public void DecrementQueueCount()
{
Debug.Assert(queueCount > 0);
- ++queueCount;
+ --queueCount;
if (owner == null && !scheduling && queueCount == 0)
{
// the queue count has dropped to zero without the process being matched
diff --git a/LocalScheduler/Queues.cs b/LocalScheduler/Queues.cs
index fdd0e9a..8dc0bca 100644
--- a/LocalScheduler/Queues.cs
+++ b/LocalScheduler/Queues.cs
@@ -131,6 +131,11 @@ namespace Microsoft.Research.Dryad.LocalScheduler
{
processQueue = new Queue();
waiterQueue = new Queue();
+
+ // start background cleaning tasks
+ CleanProcessQueue();
+ CleanWaiterQueue();
+
active = true;
}
@@ -164,6 +169,76 @@ namespace Microsoft.Research.Dryad.LocalScheduler
}
}
+ ///
+ /// background thread to periodically remove any claimed processes, so we don't
+ /// hang on to memory indefinitely
+ ///
+ private async void CleanProcessQueue()
+ {
+ while (true)
+ {
+ lock (this)
+ {
+ if (processQueue == null)
+ {
+ // we have shut down, so exit this daemon
+ return;
+ }
+
+ Queue cleanedQueue = new Queue();
+ foreach (Process p in processQueue)
+ {
+ lock (p)
+ {
+ if (p.Unclaimed)
+ {
+ cleanedQueue.Enqueue(p);
+ }
+ }
+ }
+ processQueue = cleanedQueue;
+ }
+
+ // clean again in a second
+ await Task.Delay(1000);
+ }
+ }
+
+ ///
+ /// background thread to periodically remove any claimed waiters, so we don't
+ /// hang on to memory indefinitely
+ ///
+ private async void CleanWaiterQueue()
+ {
+ while (true)
+ {
+ lock (this)
+ {
+ if (processQueue == null)
+ {
+ // we have shut down, so exit this daemon
+ return;
+ }
+
+ Queue cleanedQueue = new Queue();
+ foreach (ProcessWaiter w in waiterQueue)
+ {
+ lock (w)
+ {
+ if (w.Unclaimed)
+ {
+ cleanedQueue.Enqueue(w);
+ }
+ }
+ }
+ waiterQueue = cleanedQueue;
+ }
+
+ // clean again in a second
+ await Task.Delay(1000);
+ }
+ }
+
///
/// add a schedulable process. If there is an unclaimed computer waiting, the
/// process will be assigned to the computer and the computer's Task will be
@@ -305,7 +380,7 @@ namespace Microsoft.Research.Dryad.LocalScheduler
}
}
- // even if there are processes, they may have been claimed by ther computers
+ // even if there are processes, they may have been claimed by other computers
// already, so use a loop here
while (active && processQueue.Count > 0 && !claimed)
{
diff --git a/XmlDoc/Content/Resources/Running a job on HDInsight.aml b/XmlDoc/Content/Resources/Running a job on HDInsight.aml
index 4f422be..97970e3 100644
--- a/XmlDoc/Content/Resources/Running a job on HDInsight.aml
+++ b/XmlDoc/Content/Resources/Running a job on HDInsight.aml
@@ -127,6 +127,11 @@
a completed application. We have tried to report errors in user application code back so that they are visible in the
DryadLINQ Job Browser to avoid the need to consult
the logs.
+
+
+
+
+