Skip to content

Commit

Permalink
Remove code that causes memory leaks (#888)
Browse files Browse the repository at this point in the history
### Changes
- Replace two usages of `ConcurrentBag` with `ConcurrentStack`, which
has a much simpler implementation that has no risk of memory leaks.
ConcurrentBag was causing leaks. More information is here:
https://stackoverflow.com/questions/5353164/possible-memoryleak-in-concurrentbag
- Remove cancellation support from AsyncQueue, since the cancellation
token would gain a reference to the item being dequeued, which a caller
might not expect and which causes a memory leak if the cancellation
token is kept alive.
- Add a 'Clear' method to 'AsyncQueue' to allow cancelling all customers
- Fix a surprising memory leak in `CustomStackSizePoolTaskScheduler`
- Enable using an ExecutionEngine with a custom TaskScheduler

### Testing
- Tested with a profiler that the above changes resolve the issue that
each Boogie program created by Dafny was never garbage collected.
  • Loading branch information
keyboardDrummer authored May 22, 2024
1 parent 44a7a93 commit 60a1259
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 72 deletions.
2 changes: 1 addition & 1 deletion Source/Core/AST/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public List<GlobalVariable /*!*/> /*!*/ GlobalVariables
}
}

public readonly ConcurrentBag<TrackedNodeComponent> AllCoveredElements = new();
public readonly ConcurrentStack<TrackedNodeComponent> AllCoveredElements = new();

public IEnumerable<Block> Blocks()
{
Expand Down
13 changes: 9 additions & 4 deletions Source/Core/AsyncQueue.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
#nullable enable
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand Down Expand Up @@ -47,20 +47,25 @@ public IEnumerable<T> Items
}
}

public void Clear() {
while (customers.TryDequeue(out var customer)) {
customer.TrySetCanceled();
}
}

public int Size => items.Count;

public Task<T> Dequeue(CancellationToken cancellationToken)
public Task<T> Dequeue()
{
lock (myLock) {
if (items.TryDequeue(out var result)) {
return Task.FromResult(result);
}

var source = new TaskCompletionSource<T>();
cancellationToken.Register(() => source.TrySetCanceled(cancellationToken));
customers.Enqueue(source);
// Ensure that the TrySetResult call in Enqueue completes immediately.
return source.Task.ContinueWith(t => t.Result, cancellationToken,
return source.Task.ContinueWith(t => t.Result, CancellationToken.None,
TaskContinuationOptions.RunContinuationsAsynchronously, TaskScheduler.Current);
}
}
Expand Down
2 changes: 1 addition & 1 deletion Source/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<!-- Target framework and package configuration -->
<PropertyGroup>
<Version>3.1.5</Version>
<Version>3.1.6</Version>
<TargetFramework>net6.0</TargetFramework>
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
<Authors>Boogie</Authors>
Expand Down
36 changes: 25 additions & 11 deletions Source/ExecutionEngine/CustomStackSizePoolTaskScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,24 +62,38 @@ private void WorkLoop()
{
while (!disposeTokenSource.IsCancellationRequested)
{
try
{
var task = queue.Dequeue(disposeTokenSource.Token).Result;
TryExecuteTask(task);
}
catch (AggregateException e)
{
if (e.InnerException is TaskCanceledException)
{
break;
}
/*
* Previously the code from RunItem was inlined, but that caused something akin to a memory leak.
* It seems that variables declared inside the loop are not cleared across loop iterations
* So values assigned in iteration X can only be garbage collected until they have been reassigned
* in iteration X+1. If there is a long pause between that reassignment, which is the case here,
* then it may take a long time before dead memory can be garbage collected.
*
* Inlining RunItem and manually setting variables to null at the end of the loop body does not work,
* probably because such assignments are removed during C# compilation since they seem unused.
*/
RunItem();
}
}

private void RunItem()
{
try {
var task = queue.Dequeue().Result;
TryExecuteTask(task);
} catch(Exception e) {
if (e.GetBaseException() is OperationCanceledException) {
// Async queue cancels tasks when it is disposed, which happens when this scheduler is disposed
} else {
throw;
}
}
}

public void Dispose()
{
disposeTokenSource.Cancel();
queue.Clear();
foreach (var thread in threads)
{
thread.Join();
Expand Down
30 changes: 11 additions & 19 deletions Source/ExecutionEngine/ExecutionEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@
using VC;
using System.Runtime.Caching;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using Microsoft.Boogie.LeanAuto;
using Microsoft.Boogie.VCExprAST;
using VCGeneration;

namespace Microsoft.Boogie
Expand Down Expand Up @@ -55,26 +50,22 @@ public static int AutoRequestId(string id) {
static readonly CacheItemPolicy policy = new CacheItemPolicy
{ SlidingExpiration = new TimeSpan(0, 10, 0), Priority = CacheItemPriority.Default };

private const int stackSize = 16 * 1024 * 1024;
public const int StackSize = 16 * 1024 * 1024;

public ExecutionEngine(ExecutionEngineOptions options, VerificationResultCache cache)
: this(options, cache, CustomStackSizePoolTaskScheduler.Create(stackSize, options.VcsCores))
{
taskSchedulerCreatedLocally = true;
}

public ExecutionEngine(ExecutionEngineOptions options, VerificationResultCache cache, CustomStackSizePoolTaskScheduler scheduler) {
public ExecutionEngine(ExecutionEngineOptions options, VerificationResultCache cache, TaskScheduler scheduler, bool disposeScheduler = false) {
Options = options;
Cache = cache;
CheckerPool = new CheckerPool(options);
verifyImplementationSemaphore = new SemaphoreSlim(Options.VcsCores);

largeThreadScheduler = scheduler;
this.disposeScheduler = disposeScheduler;
largeThreadTaskFactory = new(CancellationToken.None, TaskCreationOptions.None, TaskContinuationOptions.None, largeThreadScheduler);
}

public static ExecutionEngine CreateWithoutSharedCache(ExecutionEngineOptions options) {
return new ExecutionEngine(options, new VerificationResultCache());
return new ExecutionEngine(options, new VerificationResultCache(),
CustomStackSizePoolTaskScheduler.Create(StackSize, options.VcsCores), true);
}

public ExecutionEngineOptions Options { get; }
Expand All @@ -95,8 +86,8 @@ static readonly ConcurrentDictionary<string, TimeSpan>
static readonly ConcurrentDictionary<string, CancellationTokenSource> RequestIdToCancellationTokenSource =
new ConcurrentDictionary<string, CancellationTokenSource>();

private readonly CustomStackSizePoolTaskScheduler largeThreadScheduler;
private bool taskSchedulerCreatedLocally = false;
private readonly TaskScheduler largeThreadScheduler;
private readonly bool disposeScheduler;

public async Task<bool> ProcessFiles(TextWriter output, IList<string> fileNames, bool lookForSnapshots = true,
string programId = null) {
Expand All @@ -115,7 +106,8 @@ public async Task<bool> ProcessFiles(TextWriter output, IList<string> fileNames,
var success = true;
foreach (var snapshots in snapshotsByVersion) {
// BUG: Reusing checkers during snapshots doesn't work, even though it should. We create a new engine (and thus checker pool) to workaround this.
using var engine = new ExecutionEngine(Options, Cache);
using var engine = new ExecutionEngine(Options, Cache,
CustomStackSizePoolTaskScheduler.Create(StackSize, Options.VcsCores), true);
success &= await engine.ProcessFiles(output, new List<string>(snapshots), false, programId);
}
return success;
Expand Down Expand Up @@ -1437,8 +1429,8 @@ private static void WriteErrorInformationToXmlSink(XmlSink sink, ErrorInformatio
public void Dispose()
{
CheckerPool.Dispose();
if (taskSchedulerCreatedLocally) {
largeThreadScheduler.Dispose();
if (disposeScheduler) {
(largeThreadScheduler as IDisposable)?.Dispose();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion Source/Provers/SMTLib/SExprParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public void AddLine(string line)

Task<string> ReadLine()
{
return sexpLines.Dequeue(CancellationToken.None);
return sexpLines.Dequeue();
}

async Task<char> SkipWs()
Expand Down
2 changes: 1 addition & 1 deletion Source/Provers/SMTLib/SMTLibProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ public override void AddErrorHandler(Action<string> handler)
/// </summary>
Task<string> ReadProver()
{
return solverOutput.Dequeue(CancellationToken.None);
return solverOutput.Dequeue();
}

void DisposeProver()
Expand Down
34 changes: 6 additions & 28 deletions Source/UnitTests/CoreTests/AsyncQueueTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,23 @@ public async Task DequeueOrder()
var queue = new AsyncQueue<int>();
var firstValue = 1;
var secondValue = 2;
var waitingDequeueTask = queue.Dequeue(CancellationToken.None);
var waitingDequeueTask = queue.Dequeue();
queue.Enqueue(firstValue);
queue.Enqueue(secondValue);
var firstResult = await waitingDequeueTask;
var secondResult = await queue.Dequeue(CancellationToken.None);
var secondResult = await queue.Dequeue();

Assert.AreEqual(firstValue, firstResult);
Assert.AreEqual(secondValue, secondResult);
}

[Test]
public async Task CancellationSupport()
{
var queue = new AsyncQueue<int>();
var source = new CancellationTokenSource();
var firstResultTask = queue.Dequeue(source.Token);
var secondResultTask = queue.Dequeue(CancellationToken.None);
var firstValue = 3;
source.Cancel();
queue.Enqueue(firstValue);

try {
await firstResultTask;
Assert.True(false);
}
catch (TaskCanceledException) {
Assert.True(firstResultTask.IsCanceled);
}
var secondResult = await secondResultTask;
Assert.AreEqual(firstValue, secondResult);
}

[Test]
public void ItemsAreKeptInOrder()
{
var queue = new AsyncQueue<int>();
var tasks = new List<Task<int>>();
for (int i = 0; i < 100; i++) {
tasks.Add(queue.Dequeue(CancellationToken.None));
tasks.Add(queue.Dequeue());
}
for (int i = 0; i < 100; i++) {
queue.Enqueue(i);
Expand All @@ -72,7 +50,7 @@ public async Task EnqueueReturnsBeforeCompletingDequeueTask()
var queue = new AsyncQueue<int>();
var semaphore = new Semaphore(0, 1);

var firstResultTask = queue.Dequeue(CancellationToken.None);
var firstResultTask = queue.Dequeue();
var secondValue = 2;
var mappedTask = firstResultTask.ContinueWith(t =>
{
Expand Down Expand Up @@ -105,13 +83,13 @@ void EnqueueAction()
void DequeueAction1()
{
for (int i = 0; i < amount; i++) {
tasks1.Add(queue.Dequeue(CancellationToken.None));
tasks1.Add(queue.Dequeue());
}
}
void DequeueAction2()
{
for (int i = 0; i < amount; i++) {
tasks2.Add(queue.Dequeue(CancellationToken.None));
tasks2.Add(queue.Dequeue());
}
}

Expand Down
3 changes: 2 additions & 1 deletion Source/UnitTests/ExecutionEngineTests/ExecutionEngineTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public async Task DisposeCleansUpThreads()
var options = new CommandLineOptions(TextWriter.Null, new ConsolePrinter());
options.VcsCores = 10;
int beforeCreation = Process.GetCurrentProcess().Threads.Count;
var engine = new ExecutionEngine(options, new VerificationResultCache());
var engine = new ExecutionEngine(options, new VerificationResultCache(),
CustomStackSizePoolTaskScheduler.Create(ExecutionEngine.StackSize, options.VcsCores), true);
engine.Dispose();
for (int i = 0; i < 50; i++)
{
Expand Down
8 changes: 4 additions & 4 deletions Source/VCGeneration/CheckerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace VC
{
public class CheckerPool
{
private readonly ConcurrentBag<Checker> availableCheckers = new();
private readonly ConcurrentStack<Checker> availableCheckers = new();
private readonly SemaphoreSlim checkersSemaphore;
private bool disposed;

Expand All @@ -29,7 +29,7 @@ public async Task<Checker> FindCheckerFor(Program program, Split? split, Cancell

await checkersSemaphore.WaitAsync(cancellationToken);
try {
if (!availableCheckers.TryTake(out var checker)) {
if (!availableCheckers.TryPop(out var checker)) {
checker ??= CreateNewChecker();
}
PrepareChecker(program, split, checker);
Expand Down Expand Up @@ -57,7 +57,7 @@ public void Dispose()
lock(availableCheckers)
{
disposed = true;
foreach (var checker in availableCheckers.ToArray()) {
while (availableCheckers.TryPop(out var checker)) {
checker.Close();
}
}
Expand Down Expand Up @@ -86,7 +86,7 @@ public void AddChecker(Checker checker)
checker.Close();
return;
}
availableCheckers.Add(checker);
availableCheckers.Push(checker);
checkersSemaphore.Release();
}
}
Expand Down
2 changes: 1 addition & 1 deletion Source/VCGeneration/VerificationConditionGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ void ObjectInvariant()

public override void AddCoveredElement(TrackedNodeComponent elt)
{
program.AllCoveredElements.Add(elt);
program.AllCoveredElements.Push(elt);
split.CoveredElements.Add(elt);
}

Expand Down

0 comments on commit 60a1259

Please sign in to comment.