Pass cancellation token through all input output tasks

This commit is contained in:
Malte Rosenbjerg 2020-05-11 00:34:17 +02:00
parent 48bb95e178
commit b854d5b43b
11 changed files with 30 additions and 23 deletions

View file

@ -2,6 +2,7 @@
using System.Drawing; using System.Drawing;
using System.Drawing.Imaging; using System.Drawing.Imaging;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using FFMpegCore.Pipes; using FFMpegCore.Pipes;
@ -39,7 +40,7 @@ public void Serialize(System.IO.Stream stream)
} }
} }
public async Task SerializeAsync(System.IO.Stream stream) public async Task SerializeAsync(System.IO.Stream stream, CancellationToken token)
{ {
var data = Source.LockBits(new Rectangle(0, 0, Width, Height), ImageLockMode.ReadOnly, Source.PixelFormat); var data = Source.LockBits(new Rectangle(0, 0, Width, Height), ImageLockMode.ReadOnly, Source.PixelFormat);
@ -47,7 +48,7 @@ public async Task SerializeAsync(System.IO.Stream stream)
{ {
var buffer = new byte[data.Stride * data.Height]; var buffer = new byte[data.Stride * data.Height];
Marshal.Copy(data.Scan0, buffer, 0, buffer.Length); Marshal.Copy(data.Scan0, buffer, 0, buffer.Length);
await stream.WriteAsync(buffer, 0, buffer.Length); await stream.WriteAsync(buffer, 0, buffer.Length, token);
} }
finally finally
{ {

View file

@ -8,7 +8,7 @@ namespace FFMpegCore.Arguments
/// <summary> /// <summary>
/// Represents input parameter for a named pipe /// Represents input parameter for a named pipe
/// </summary> /// </summary>
public class InputPipeArgument : PipeArgument public class InputPipeArgument : PipeArgument, IInputArgument
{ {
public readonly IPipeDataWriter Writer; public readonly IPipeDataWriter Writer;
@ -24,7 +24,7 @@ public override async Task ProcessDataAsync(CancellationToken token)
await Pipe.WaitForConnectionAsync(token).ConfigureAwait(false); await Pipe.WaitForConnectionAsync(token).ConfigureAwait(false);
if (!Pipe.IsConnected) if (!Pipe.IsConnected)
throw new TaskCanceledException(); throw new TaskCanceledException();
await Writer.WriteDataAsync(Pipe).ConfigureAwait(false); await Writer.WriteDataAsync(Pipe, token).ConfigureAwait(false);
} }
} }
} }

View file

@ -5,7 +5,7 @@
namespace FFMpegCore.Arguments namespace FFMpegCore.Arguments
{ {
public class OutputPipeArgument : PipeArgument public class OutputPipeArgument : PipeArgument, IOutputArgument
{ {
public readonly IPipeDataReader Reader; public readonly IPipeDataReader Reader;
@ -21,7 +21,7 @@ public override async Task ProcessDataAsync(CancellationToken token)
await Pipe.WaitForConnectionAsync(token).ConfigureAwait(false); await Pipe.WaitForConnectionAsync(token).ConfigureAwait(false);
if (!Pipe.IsConnected) if (!Pipe.IsConnected)
throw new TaskCanceledException(); throw new TaskCanceledException();
await Reader.ReadDataAsync(Pipe).ConfigureAwait(false); await Reader.ReadDataAsync(Pipe, token).ConfigureAwait(false);
} }
} }
} }

View file

@ -6,7 +6,7 @@
namespace FFMpegCore.Arguments namespace FFMpegCore.Arguments
{ {
public abstract class PipeArgument : IInputArgument, IOutputArgument public abstract class PipeArgument
{ {
private string PipeName { get; } private string PipeName { get; }
public string PipePath => PipeHelpers.GetPipePath(PipeName); public string PipePath => PipeHelpers.GetPipePath(PipeName);

View file

@ -102,9 +102,9 @@ internal void Pre()
_inputArgument.Pre(); _inputArgument.Pre();
_outputArgument.Pre(); _outputArgument.Pre();
} }
internal Task During(CancellationToken? cancellationToken = null) internal async Task During(CancellationToken? cancellationToken = null)
{ {
return Task.WhenAll(_inputArgument.During(cancellationToken), _outputArgument.During(cancellationToken)); await Task.WhenAll(_inputArgument.During(cancellationToken), _outputArgument.During(cancellationToken)).ConfigureAwait(false);
} }
internal void Post() internal void Post()
{ {

View file

@ -1,11 +1,12 @@
using System.Threading.Tasks; using System.Threading;
using System.Threading.Tasks;
namespace FFMpegCore.Pipes namespace FFMpegCore.Pipes
{ {
public interface IPipeDataReader public interface IPipeDataReader
{ {
void ReadData(System.IO.Stream stream); void ReadData(System.IO.Stream stream);
Task ReadDataAsync(System.IO.Stream stream); Task ReadDataAsync(System.IO.Stream stream, CancellationToken token);
string GetFormat(); string GetFormat();
} }
} }

View file

@ -1,4 +1,5 @@
using System.Threading.Tasks; using System.Threading;
using System.Threading.Tasks;
namespace FFMpegCore.Pipes namespace FFMpegCore.Pipes
{ {
@ -9,6 +10,6 @@ public interface IPipeDataWriter
{ {
string GetFormat(); string GetFormat();
void WriteData(System.IO.Stream pipe); void WriteData(System.IO.Stream pipe);
Task WriteDataAsync(System.IO.Stream pipe); Task WriteDataAsync(System.IO.Stream pipe, CancellationToken token);
} }
} }

View file

@ -1,4 +1,5 @@
using System.Threading.Tasks; using System.Threading;
using System.Threading.Tasks;
namespace FFMpegCore.Pipes namespace FFMpegCore.Pipes
{ {
@ -12,6 +13,6 @@ public interface IVideoFrame
string Format { get; } string Format { get; }
void Serialize(System.IO.Stream pipe); void Serialize(System.IO.Stream pipe);
Task SerializeAsync(System.IO.Stream pipe); Task SerializeAsync(System.IO.Stream pipe, CancellationToken token);
} }
} }

View file

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using FFMpegCore.Exceptions; using FFMpegCore.Exceptions;
@ -59,16 +60,16 @@ public void WriteData(System.IO.Stream stream)
} }
} }
public async Task WriteDataAsync(System.IO.Stream stream) public async Task WriteDataAsync(System.IO.Stream stream, CancellationToken token)
{ {
if (_framesEnumerator.Current != null) if (_framesEnumerator.Current != null)
{ {
await _framesEnumerator.Current.SerializeAsync(stream).ConfigureAwait(false); await _framesEnumerator.Current.SerializeAsync(stream, token).ConfigureAwait(false);
} }
while (_framesEnumerator.MoveNext()) while (_framesEnumerator.MoveNext())
{ {
await _framesEnumerator.Current!.SerializeAsync(stream).ConfigureAwait(false); await _framesEnumerator.Current!.SerializeAsync(stream, token).ConfigureAwait(false);
} }
} }

View file

@ -1,4 +1,5 @@
using System.Threading.Tasks; using System.Threading;
using System.Threading.Tasks;
namespace FFMpegCore.Pipes namespace FFMpegCore.Pipes
{ {
@ -16,8 +17,8 @@ public StreamPipeDataReader(System.IO.Stream destanationStream)
public void ReadData(System.IO.Stream stream) => public void ReadData(System.IO.Stream stream) =>
stream.CopyTo(DestanationStream, BlockSize); stream.CopyTo(DestanationStream, BlockSize);
public Task ReadDataAsync(System.IO.Stream stream) => public Task ReadDataAsync(System.IO.Stream stream, CancellationToken token) =>
stream.CopyToAsync(DestanationStream, BlockSize); stream.CopyToAsync(DestanationStream, BlockSize, token);
public string GetFormat() public string GetFormat()
{ {

View file

@ -1,4 +1,5 @@
using System.Threading.Tasks; using System.Threading;
using System.Threading.Tasks;
namespace FFMpegCore.Pipes namespace FFMpegCore.Pipes
{ {
@ -18,7 +19,7 @@ public StreamPipeDataWriter(System.IO.Stream stream)
public void WriteData(System.IO.Stream pipe) => Source.CopyTo(pipe, BlockSize); public void WriteData(System.IO.Stream pipe) => Source.CopyTo(pipe, BlockSize);
public Task WriteDataAsync(System.IO.Stream pipe) => Source.CopyToAsync(pipe, BlockSize); public Task WriteDataAsync(System.IO.Stream pipe, CancellationToken token) => Source.CopyToAsync(pipe, BlockSize, token);
public string GetFormat() => StreamFormat; public string GetFormat() => StreamFormat;
} }