Make process cancellable

This commit is contained in:
Malte Rosenbjerg 2020-05-12 23:52:07 +02:00
parent a754d57421
commit b2085c4a8b
2 changed files with 84 additions and 28 deletions

View file

@ -7,6 +7,7 @@
using System.Drawing.Imaging; using System.Drawing.Imaging;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Threading.Tasks;
using FFMpegCore.Arguments; using FFMpegCore.Arguments;
using FFMpegCore.Exceptions; using FFMpegCore.Exceptions;
using FFMpegCore.Pipes; using FFMpegCore.Pipes;
@ -296,9 +297,9 @@ public void Video_ToMP4_Args_StreamPipe()
} }
[TestMethod, Timeout(10000)] [TestMethod, Timeout(10000)]
public void Video_ToMP4_Args_StreamOutputPipe_Async_Failure() public async Task Video_ToMP4_Args_StreamOutputPipe_Async_Failure()
{ {
Assert.ThrowsExceptionAsync<FFMpegException>(async () => await Assert.ThrowsExceptionAsync<FFMpegException>(async () =>
{ {
await using var ms = new MemoryStream(); await using var ms = new MemoryStream();
var pipeSource = new StreamPipeDataReader(ms); var pipeSource = new StreamPipeDataReader(ms);
@ -636,5 +637,26 @@ public void Video_TranscodeInMemory()
Assert.AreEqual(vi.PrimaryVideoStream.Width, 128); Assert.AreEqual(vi.PrimaryVideoStream.Width, 128);
Assert.AreEqual(vi.PrimaryVideoStream.Height, 128); Assert.AreEqual(vi.PrimaryVideoStream.Height, 128);
} }
[TestMethod]
public async Task Video_Cancel_Async()
{
await using var resStream = new MemoryStream();
var reader = new StreamPipeDataReader(resStream);
var writer = new RawVideoPipeDataWriter(BitmapSource.CreateBitmaps(256, System.Drawing.Imaging.PixelFormat.Format24bppRgb, 128, 128));
var task = FFMpegArguments
.FromPipe(writer)
.WithVideoCodec("vp9")
.ForceFormat("webm")
.OutputToPipe(reader)
.CancellableThrough(out var cancel)
.ProcessAsynchronously(false);
cancel();
var result = await task;
Assert.IsFalse(result);
}
} }
} }

View file

@ -26,27 +26,36 @@ internal FFMpegArgumentProcessor(FFMpegArguments ffMpegArguments)
public string Arguments => _ffMpegArguments.Text; public string Arguments => _ffMpegArguments.Text;
public FFMpegArgumentProcessor NotifyOnProgress(Action<double>? onPercentageProgress, TimeSpan? totalTimeSpan) private event EventHandler _cancelEvent;
public FFMpegArgumentProcessor NotifyOnProgress(Action<double> onPercentageProgress, TimeSpan totalTimeSpan)
{ {
_totalTimespan = totalTimeSpan; _totalTimespan = totalTimeSpan;
_onPercentageProgress = onPercentageProgress; _onPercentageProgress = onPercentageProgress;
return this; return this;
} }
public FFMpegArgumentProcessor NotifyOnProgress(Action<TimeSpan>? onTimeProgress) public FFMpegArgumentProcessor NotifyOnProgress(Action<TimeSpan> onTimeProgress)
{ {
_onTimeProgress = onTimeProgress; _onTimeProgress = onTimeProgress;
return this; return this;
} }
public FFMpegArgumentProcessor CancellableThrough(out Action cancel)
{
cancel = () => _cancelEvent?.Invoke(this, EventArgs.Empty);
return this;
}
public bool ProcessSynchronously(bool throwOnError = true) public bool ProcessSynchronously(bool throwOnError = true)
{ {
FFMpegHelper.RootExceptionCheck(FFMpegOptions.Options.RootDirectory); var instance = PrepareInstance(out var cancellationTokenSource, out var errorCode);
using var instance = new Instance(FFMpegOptions.Options.FFmpegBinary(), _ffMpegArguments.Text);
instance.DataReceived += OutputData;
var errorCode = -1;
_ffMpegArguments.Pre(); void OnCancelEvent(object sender, EventArgs args)
{
instance?.SendInput("q");
cancellationTokenSource.Cancel();
}
_cancelEvent += OnCancelEvent;
var cancellationTokenSource = new CancellationTokenSource(); _ffMpegArguments.Pre();
try try
{ {
Task.WaitAll(instance.FinishedRunning().ContinueWith(t => Task.WaitAll(instance.FinishedRunning().ContinueWith(t =>
@ -57,34 +66,35 @@ public bool ProcessSynchronously(bool throwOnError = true)
} }
catch (Exception e) catch (Exception e)
{ {
if (!throwOnError) if (!HandleException(throwOnError, e, instance)) return false;
return false;
throw new FFMpegException(FFMpegExceptionType.Process, "Exception thrown during processing", e,
string.Join("\n", instance.ErrorData));
} }
finally finally
{ {
_cancelEvent -= OnCancelEvent;
_ffMpegArguments.Post(); _ffMpegArguments.Post();
} }
if (throwOnError && errorCode != 0) if (throwOnError && errorCode != 0)
throw new FFMpegException(FFMpegExceptionType.Conversion, string.Join("\n", instance.ErrorData)); throw new FFMpegException(FFMpegExceptionType.Conversion, string.Join("\n", instance.ErrorData));
_onPercentageProgress?.Invoke(100.0);
if (_totalTimespan.HasValue) _onTimeProgress?.Invoke(_totalTimespan.Value);
return errorCode == 0; return errorCode == 0;
} }
public async Task<bool> ProcessAsynchronously(bool throwOnError = true) public async Task<bool> ProcessAsynchronously(bool throwOnError = true)
{ {
FFMpegHelper.RootExceptionCheck(FFMpegOptions.Options.RootDirectory); using var instance = PrepareInstance(out var cancellationTokenSource, out var errorCode);
using var instance = new Instance(FFMpegOptions.Options.FFmpegBinary(), _ffMpegArguments.Text);
if (_onTimeProgress != null || (_onPercentageProgress != null && _totalTimespan != null))
instance.DataReceived += OutputData;
var errorCode = -1;
_ffMpegArguments.Pre(); void OnCancelEvent(object sender, EventArgs args)
{
instance?.SendInput("q");
cancellationTokenSource.Cancel();
}
_cancelEvent += OnCancelEvent;
var cancellationTokenSource = new CancellationTokenSource(); _ffMpegArguments.Pre();
try try
{ {
await Task.WhenAll(instance.FinishedRunning().ContinueWith(t => await Task.WhenAll(instance.FinishedRunning().ContinueWith(t =>
@ -95,24 +105,48 @@ await Task.WhenAll(instance.FinishedRunning().ContinueWith(t =>
} }
catch (Exception e) catch (Exception e)
{ {
if (!throwOnError) if (!HandleException(throwOnError, e, instance)) return false;
return false;
throw new FFMpegException(FFMpegExceptionType.Process, "Exception thrown during processing", e,
string.Join("\n", instance.ErrorData));
} }
finally finally
{ {
_cancelEvent -= OnCancelEvent;
_ffMpegArguments.Post(); _ffMpegArguments.Post();
} }
if (throwOnError && errorCode != 0) if (throwOnError && errorCode != 0)
throw new FFMpegException(FFMpegExceptionType.Conversion, string.Join("\n", instance.ErrorData)); throw new FFMpegException(FFMpegExceptionType.Conversion, string.Join("\n", instance.ErrorData));
_onPercentageProgress?.Invoke(100.0);
if (_totalTimespan.HasValue) _onTimeProgress?.Invoke(_totalTimespan.Value);
return errorCode == 0; return errorCode == 0;
} }
private Instance PrepareInstance(out CancellationTokenSource cancellationTokenSource, out int errorCode)
{
FFMpegHelper.RootExceptionCheck(FFMpegOptions.Options.RootDirectory);
var instance = new Instance(FFMpegOptions.Options.FFmpegBinary(), _ffMpegArguments.Text);
instance.DataReceived += OutputData;
cancellationTokenSource = new CancellationTokenSource();
if (_onTimeProgress != null || (_onPercentageProgress != null && _totalTimespan != null))
instance.DataReceived += OutputData;
errorCode = -1;
return instance;
}
private static bool HandleException(bool throwOnError, Exception e, Instance instance)
{
if (!throwOnError)
return false;
throw new FFMpegException(FFMpegExceptionType.Process, "Exception thrown during processing", e,
string.Join("\n", instance.ErrorData));
}
private static readonly Regex ProgressRegex = new Regex(@"time=(\d\d:\d\d:\d\d.\d\d?)", RegexOptions.Compiled); private static readonly Regex ProgressRegex = new Regex(@"time=(\d\d:\d\d:\d\d.\d\d?)", RegexOptions.Compiled);
private Action<double>? _onPercentageProgress; private Action<double>? _onPercentageProgress;
private Action<TimeSpan>? _onTimeProgress; private Action<TimeSpan>? _onTimeProgress;