using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Reflection; using System.Resources; using System.Runtime.CompilerServices; using System.Runtime.ExceptionServices; using System.Runtime.InteropServices; using System.Runtime.Serialization; using System.Runtime.Versioning; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Sources; using FxResources.System.Threading.Channels; using Internal; using Microsoft.CodeAnalysis; [assembly: CompilationRelaxations(8)] [assembly: RuntimeCompatibility(WrapNonExceptionThrows = true)] [assembly: Debuggable(DebuggableAttribute.DebuggingModes.IgnoreSymbolStoreSequencePoints)] [assembly: TargetFramework(".NETStandard,Version=v2.1", FrameworkDisplayName = ".NET Standard 2.1")] [assembly: AssemblyMetadata("Serviceable", "True")] [assembly: AssemblyMetadata("PreferInbox", "True")] [assembly: AssemblyDefaultAlias("System.Threading.Channels")] [assembly: NeutralResourcesLanguage("en-US")] [assembly: CLSCompliant(true)] [assembly: AssemblyMetadata("IsTrimmable", "True")] [assembly: AssemblyMetadata("IsAotCompatible", "True")] [assembly: DefaultDllImportSearchPaths(DllImportSearchPath.System32 | DllImportSearchPath.AssemblyDirectory)] [assembly: AssemblyCompany("Microsoft Corporation")] [assembly: AssemblyCopyright("© Microsoft Corporation. All rights reserved.")] [assembly: AssemblyDescription("Provides types for passing data between producers and consumers.\r\n\r\nCommonly Used Types:\r\nSystem.Threading.Channel\r\nSystem.Threading.Channel")] [assembly: AssemblyFileVersion("10.0.726.21808")] [assembly: AssemblyInformationalVersion("10.0.7+b16286c2284fecf303dbc12a0bb152476d662e44")] [assembly: AssemblyProduct("Microsoft® .NET")] [assembly: AssemblyTitle("System.Threading.Channels")] [assembly: AssemblyMetadata("RepositoryUrl", "https://github.com/dotnet/dotnet")] [assembly: AssemblyVersion("10.0.0.0")] [module: RefSafetyRules(11)] [module: System.Runtime.CompilerServices.NullablePublicOnly(false)] namespace Microsoft.CodeAnalysis { [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] internal sealed class EmbeddedAttribute : Attribute { } } namespace System.Runtime.CompilerServices { [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] [AttributeUsage(AttributeTargets.Class | AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Event | AttributeTargets.Parameter | AttributeTargets.ReturnValue | AttributeTargets.GenericParameter, AllowMultiple = false, Inherited = false)] internal sealed class NullableAttribute : Attribute { public readonly byte[] NullableFlags; public NullableAttribute(byte P_0) { NullableFlags = new byte[1] { P_0 }; } public NullableAttribute(byte[] P_0) { NullableFlags = P_0; } } [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] [AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct | AttributeTargets.Method | AttributeTargets.Interface | AttributeTargets.Delegate, AllowMultiple = false, Inherited = false)] internal sealed class NullableContextAttribute : Attribute { public readonly byte Flag; public NullableContextAttribute(byte P_0) { Flag = P_0; } } [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] [AttributeUsage(AttributeTargets.Module, AllowMultiple = false, Inherited = false)] internal sealed class NullablePublicOnlyAttribute : Attribute { public readonly bool IncludesInternals; public NullablePublicOnlyAttribute(bool P_0) { IncludesInternals = P_0; } } [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] [AttributeUsage(AttributeTargets.Module, AllowMultiple = false, Inherited = false)] internal sealed class RefSafetyRulesAttribute : Attribute { public readonly int Version; public RefSafetyRulesAttribute(int P_0) { Version = P_0; } } [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] [AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct | AttributeTargets.Enum | AttributeTargets.Method | AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Event | AttributeTargets.Interface | AttributeTargets.Delegate, AllowMultiple = false, Inherited = false)] internal sealed class ExtensionMarkerAttribute : Attribute { public ExtensionMarkerAttribute(string name) { } } } namespace FxResources.System.Threading.Channels { internal static class SR { } } namespace Internal { internal static class PaddingHelpers { internal const int CACHE_LINE_SIZE = 128; } [StructLayout(LayoutKind.Explicit, Size = 124)] internal struct PaddingFor32 { } } namespace System { [StructLayout(LayoutKind.Sequential, Size = 1)] internal readonly struct VoidResult { } internal static class Obsoletions { internal const string SharedUrlFormat = "https://aka.ms/dotnet-warnings/{0}"; internal const string SystemTextEncodingUTF7Message = "The UTF-7 encoding is insecure and should not be used. Consider using UTF-8 instead."; internal const string SystemTextEncodingUTF7DiagId = "SYSLIB0001"; internal const string PrincipalPermissionAttributeMessage = "PrincipalPermissionAttribute is not honored by the runtime and must not be used."; internal const string PrincipalPermissionAttributeDiagId = "SYSLIB0002"; internal const string CodeAccessSecurityMessage = "Code Access Security is not supported or honored by the runtime."; internal const string CodeAccessSecurityDiagId = "SYSLIB0003"; internal const string ConstrainedExecutionRegionMessage = "The Constrained Execution Region (CER) feature is not supported."; internal const string ConstrainedExecutionRegionDiagId = "SYSLIB0004"; internal const string GlobalAssemblyCacheMessage = "The Global Assembly Cache is not supported."; internal const string GlobalAssemblyCacheDiagId = "SYSLIB0005"; internal const string ThreadAbortMessage = "Thread.Abort is not supported and throws PlatformNotSupportedException."; internal const string ThreadResetAbortMessage = "Thread.ResetAbort is not supported and throws PlatformNotSupportedException."; internal const string ThreadAbortDiagId = "SYSLIB0006"; internal const string DefaultCryptoAlgorithmsMessage = "The default implementation of this cryptography algorithm is not supported."; internal const string DefaultCryptoAlgorithmsDiagId = "SYSLIB0007"; internal const string CreatePdbGeneratorMessage = "The CreatePdbGenerator API is not supported and throws PlatformNotSupportedException."; internal const string CreatePdbGeneratorDiagId = "SYSLIB0008"; internal const string AuthenticationManagerMessage = "AuthenticationManager is not supported. Methods will no-op or throw PlatformNotSupportedException."; internal const string AuthenticationManagerDiagId = "SYSLIB0009"; internal const string RemotingApisMessage = "This Remoting API is not supported and throws PlatformNotSupportedException."; internal const string RemotingApisDiagId = "SYSLIB0010"; internal const string BinaryFormatterMessage = "BinaryFormatter serialization is obsolete and should not be used. See https://aka.ms/binaryformatter for more information."; internal const string BinaryFormatterDiagId = "SYSLIB0011"; internal const string CodeBaseMessage = "Assembly.CodeBase and Assembly.EscapedCodeBase are only included for .NET Framework compatibility. Use Assembly.Location instead."; internal const string CodeBaseDiagId = "SYSLIB0012"; internal const string EscapeUriStringMessage = "Uri.EscapeUriString can corrupt the Uri string in some cases. Consider using Uri.EscapeDataString for query string components instead."; internal const string EscapeUriStringDiagId = "SYSLIB0013"; internal const string WebRequestMessage = "WebRequest, HttpWebRequest, ServicePoint, and WebClient are obsolete. Use HttpClient instead."; internal const string WebRequestDiagId = "SYSLIB0014"; internal const string DisablePrivateReflectionAttributeMessage = "DisablePrivateReflectionAttribute has no effect in .NET 6.0+."; internal const string DisablePrivateReflectionAttributeDiagId = "SYSLIB0015"; internal const string GetContextInfoMessage = "Use the Graphics.GetContextInfo overloads that accept arguments for better performance and fewer allocations."; internal const string GetContextInfoDiagId = "SYSLIB0016"; internal const string StrongNameKeyPairMessage = "Strong name signing is not supported and throws PlatformNotSupportedException."; internal const string StrongNameKeyPairDiagId = "SYSLIB0017"; internal const string ReflectionOnlyLoadingMessage = "ReflectionOnly loading is not supported and throws PlatformNotSupportedException."; internal const string ReflectionOnlyLoadingDiagId = "SYSLIB0018"; internal const string RuntimeEnvironmentMessage = "RuntimeEnvironment members SystemConfigurationFile, GetRuntimeInterfaceAsIntPtr, and GetRuntimeInterfaceAsObject are not supported and throw PlatformNotSupportedException."; internal const string RuntimeEnvironmentDiagId = "SYSLIB0019"; internal const string JsonSerializerOptionsIgnoreNullValuesMessage = "JsonSerializerOptions.IgnoreNullValues is obsolete. To ignore null values when serializing, set DefaultIgnoreCondition to JsonIgnoreCondition.WhenWritingNull."; internal const string JsonSerializerOptionsIgnoreNullValuesDiagId = "SYSLIB0020"; internal const string DerivedCryptographicTypesMessage = "Derived cryptographic types are obsolete. Use the Create method on the base type instead."; internal const string DerivedCryptographicTypesDiagId = "SYSLIB0021"; internal const string RijndaelMessage = "The Rijndael and RijndaelManaged types are obsolete. Use Aes instead."; internal const string RijndaelDiagId = "SYSLIB0022"; internal const string RNGCryptoServiceProviderMessage = "RNGCryptoServiceProvider is obsolete. To generate a random number, use one of the RandomNumberGenerator static methods instead."; internal const string RNGCryptoServiceProviderDiagId = "SYSLIB0023"; internal const string AppDomainCreateUnloadMessage = "Creating and unloading AppDomains is not supported and throws an exception."; internal const string AppDomainCreateUnloadDiagId = "SYSLIB0024"; internal const string SuppressIldasmAttributeMessage = "SuppressIldasmAttribute has no effect in .NET 6.0+."; internal const string SuppressIldasmAttributeDiagId = "SYSLIB0025"; internal const string X509CertificateImmutableMessage = "X509Certificate and X509Certificate2 are immutable. Use X509CertificateLoader to create a new certificate."; internal const string X509CertificateImmutableDiagId = "SYSLIB0026"; internal const string PublicKeyPropertyMessage = "PublicKey.Key is obsolete. Use the appropriate method to get the public key, such as GetRSAPublicKey."; internal const string PublicKeyPropertyDiagId = "SYSLIB0027"; internal const string X509CertificatePrivateKeyMessage = "X509Certificate2.PrivateKey is obsolete. Use the appropriate method to get the private key, such as GetRSAPrivateKey, or use the CopyWithPrivateKey method to create a new instance with a private key."; internal const string X509CertificatePrivateKeyDiagId = "SYSLIB0028"; internal const string ProduceLegacyHmacValuesMessage = "ProduceLegacyHmacValues is obsolete. Producing legacy HMAC values is not supported."; internal const string ProduceLegacyHmacValuesDiagId = "SYSLIB0029"; internal const string UseManagedSha1Message = "HMACSHA1 always uses the algorithm implementation provided by the platform. Use a constructor without the useManagedSha1 parameter."; internal const string UseManagedSha1DiagId = "SYSLIB0030"; internal const string CryptoConfigEncodeOIDMessage = "EncodeOID is obsolete. Use the ASN.1 functionality provided in System.Formats.Asn1."; internal const string CryptoConfigEncodeOIDDiagId = "SYSLIB0031"; internal const string CorruptedStateRecoveryMessage = "Recovery from corrupted process state exceptions is not supported; HandleProcessCorruptedStateExceptionsAttribute is ignored."; internal const string CorruptedStateRecoveryDiagId = "SYSLIB0032"; internal const string Rfc2898CryptDeriveKeyMessage = "Rfc2898DeriveBytes.CryptDeriveKey is obsolete and is not supported. Use PasswordDeriveBytes.CryptDeriveKey instead."; internal const string Rfc2898CryptDeriveKeyDiagId = "SYSLIB0033"; internal const string CmsSignerCspParamsCtorMessage = "CmsSigner(CspParameters) is obsolete and is not supported. Use an alternative constructor instead."; internal const string CmsSignerCspParamsCtorDiagId = "SYSLIB0034"; internal const string SignerInfoCounterSigMessage = "ComputeCounterSignature without specifying a CmsSigner is obsolete and is not supported. Use the overload that accepts a CmsSigner."; internal const string SignerInfoCounterSigDiagId = "SYSLIB0035"; internal const string RegexCompileToAssemblyMessage = "Regex.CompileToAssembly is obsolete and not supported. Use the GeneratedRegexAttribute with the regular expression source generator instead."; internal const string RegexCompileToAssemblyDiagId = "SYSLIB0036"; internal const string AssemblyNameMembersMessage = "AssemblyName members HashAlgorithm, ProcessorArchitecture, and VersionCompatibility are obsolete and not supported."; internal const string AssemblyNameMembersDiagId = "SYSLIB0037"; internal const string SystemDataSerializationFormatBinaryMessage = "SerializationFormat.Binary is obsolete and should not be used. See https://aka.ms/serializationformat-binary-obsolete for more information."; internal const string SystemDataSerializationFormatBinaryDiagId = "SYSLIB0038"; internal const string TlsVersion10and11Message = "TLS versions 1.0 and 1.1 have known vulnerabilities and are not recommended. Use a newer TLS version instead, or use SslProtocols.None to defer to OS defaults."; internal const string TlsVersion10and11DiagId = "SYSLIB0039"; internal const string EncryptionPolicyMessage = "EncryptionPolicy.NoEncryption and AllowEncryption significantly reduce security and should not be used in production code."; internal const string EncryptionPolicyDiagId = "SYSLIB0040"; internal const string EccXmlExportImportMessage = "ToXmlString and FromXmlString have no implementation for ECC types, and are obsolete. Use a standard import and export format such as ExportSubjectPublicKeyInfo or ImportSubjectPublicKeyInfo for public keys and ExportPkcs8PrivateKey or ImportPkcs8PrivateKey for private keys."; internal const string EccXmlExportImportDiagId = "SYSLIB0042"; internal const string EcDhPublicKeyBlobMessage = "ECDiffieHellmanPublicKey.ToByteArray() and the associated constructor do not have a consistent and interoperable implementation on all platforms. Use ECDiffieHellmanPublicKey.ExportSubjectPublicKeyInfo() instead."; internal const string EcDhPublicKeyBlobDiagId = "SYSLIB0043"; internal const string AssemblyNameCodeBaseMessage = "AssemblyName.CodeBase and AssemblyName.EscapedCodeBase are obsolete. Using them for loading an assembly is not supported."; internal const string AssemblyNameCodeBaseDiagId = "SYSLIB0044"; internal const string CryptoStringFactoryMessage = "Cryptographic factory methods accepting an algorithm name are obsolete. Use the parameterless Create factory method on the algorithm type instead."; internal const string CryptoStringFactoryDiagId = "SYSLIB0045"; internal const string ControlledExecutionRunMessage = "ControlledExecution.Run method may corrupt the process and should not be used in production code."; internal const string ControlledExecutionRunDiagId = "SYSLIB0046"; internal const string XmlSecureResolverMessage = "XmlSecureResolver is obsolete. Use XmlResolver.ThrowingResolver instead when attempting to forbid XML external entity resolution."; internal const string XmlSecureResolverDiagId = "SYSLIB0047"; internal const string RsaEncryptDecryptValueMessage = "RSA.EncryptValue and DecryptValue are not supported and throw NotSupportedException. Use RSA.Encrypt and RSA.Decrypt instead."; internal const string RsaEncryptDecryptDiagId = "SYSLIB0048"; internal const string JsonSerializerOptionsAddContextMessage = "JsonSerializerOptions.AddContext is obsolete. To register a JsonSerializerContext, use either the TypeInfoResolver or TypeInfoResolverChain properties."; internal const string JsonSerializerOptionsAddContextDiagId = "SYSLIB0049"; internal const string LegacyFormatterMessage = "Formatter-based serialization is obsolete and should not be used."; internal const string LegacyFormatterDiagId = "SYSLIB0050"; internal const string LegacyFormatterImplMessage = "This API supports obsolete formatter-based serialization. It should not be called or extended by application code."; internal const string LegacyFormatterImplDiagId = "SYSLIB0051"; internal const string RegexExtensibilityImplMessage = "This API supports obsolete mechanisms for Regex extensibility. It is not supported."; internal const string RegexExtensibilityDiagId = "SYSLIB0052"; internal const string AesGcmTagConstructorMessage = "AesGcm should indicate the required tag size for encryption and decryption. Use a constructor that accepts the tag size."; internal const string AesGcmTagConstructorDiagId = "SYSLIB0053"; internal const string ThreadVolatileReadWriteMessage = "Thread.VolatileRead and Thread.VolatileWrite are obsolete. Use Volatile.Read or Volatile.Write respectively instead."; internal const string ThreadVolatileReadWriteDiagId = "SYSLIB0054"; internal const string ArmIntrinsicPerformsUnsignedOperationMessage = "The underlying hardware instruction does not perform a signed saturate narrowing operation, and it always returns an unsigned result. Use the unsigned overload instead."; internal const string ArmIntrinsicPerformsUnsignedOperationDiagId = "SYSLIB0055"; internal const string LoadFromHashAlgorithmMessage = "LoadFrom with a custom AssemblyHashAlgorithm is obsolete. Use overloads without an AssemblyHashAlgorithm."; internal const string LoadFromHashAlgorithmDiagId = "SYSLIB0056"; internal const string X509CtorCertDataObsoleteMessage = "Loading certificate data through the constructor or Import is obsolete. Use X509CertificateLoader instead to load certificates."; internal const string X509CtorCertDataObsoleteDiagId = "SYSLIB0057"; internal const string TlsCipherAlgorithmEnumsMessage = "KeyExchangeAlgorithm, KeyExchangeStrength, CipherAlgorithm, CipherStrength, HashAlgorithm and HashStrength properties of SslStream are obsolete. Use NegotiatedCipherSuite instead."; internal const string TlsCipherAlgorithmEnumsDiagId = "SYSLIB0058"; internal const string SystemEventsEventsThreadShutdownMessage = "SystemEvents.EventsThreadShutdown callbacks are not run before the process exits. Use AppDomain.ProcessExit instead."; internal const string SystemEventsEventsThreadShutdownDiagId = "SYSLIB0059"; internal const string Rfc2898DeriveBytesCtorMessage = "The constructors on Rfc2898DeriveBytes are obsolete. Use the static Pbkdf2 method instead."; internal const string Rfc2898DeriveBytesCtorDiagId = "SYSLIB0060"; internal const string QueryableMinByMaxByTSourceObsoleteMessage = "The Queryable MinBy and MaxBy taking an IComparer are obsolete. Use the new ones that take an IComparer."; internal const string QueryableMinByMaxByTSourceObsoleteDiagId = "SYSLIB0061"; internal const string XsltSettingsEnableScriptMessage = "XSLT Script blocks are not supported."; internal const string XsltSettingsEnableScriptDiagId = "SYSLIB0062"; } internal static class SR { private static readonly bool s_usingResourceKeys = GetUsingResourceKeysSwitchValue(); private static ResourceManager s_resourceManager; internal static ResourceManager ResourceManager => s_resourceManager ?? (s_resourceManager = new ResourceManager(typeof(SR))); internal static string ChannelClosedException_DefaultMessage => GetResourceString("ChannelClosedException_DefaultMessage"); internal static string InvalidOperation_IncompleteAsyncOperation => GetResourceString("InvalidOperation_IncompleteAsyncOperation"); internal static string InvalidOperation_MultipleContinuations => GetResourceString("InvalidOperation_MultipleContinuations"); internal static string InvalidOperation_IncorrectToken => GetResourceString("InvalidOperation_IncorrectToken"); private static bool GetUsingResourceKeysSwitchValue() { if (!AppContext.TryGetSwitch("System.Resources.UseSystemResourceKeys", out var isEnabled)) { return false; } return isEnabled; } internal static bool UsingResourceKeys() { return s_usingResourceKeys; } private static string GetResourceString(string resourceKey) { if (UsingResourceKeys()) { return resourceKey; } string result = null; try { result = ResourceManager.GetString(resourceKey); } catch (MissingManifestResourceException) { } return result; } private static string GetResourceString(string resourceKey, string defaultString) { string resourceString = GetResourceString(resourceKey); if (!(resourceKey == resourceString) && resourceString != null) { return resourceString; } return defaultString; } internal static string Format(string resourceFormat, object p1) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1); } return string.Format(resourceFormat, p1); } internal static string Format(string resourceFormat, object p1, object p2) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1, p2); } return string.Format(resourceFormat, p1, p2); } internal static string Format(string resourceFormat, object p1, object p2, object p3) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1, p2, p3); } return string.Format(resourceFormat, p1, p2, p3); } internal static string Format(string resourceFormat, params object[] args) { if (args != null) { if (UsingResourceKeys()) { return resourceFormat + ", " + string.Join(", ", args); } return string.Format(resourceFormat, args); } return resourceFormat; } internal static string Format(IFormatProvider provider, string resourceFormat, object p1) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1); } return string.Format(provider, resourceFormat, p1); } internal static string Format(IFormatProvider provider, string resourceFormat, object p1, object p2) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1, p2); } return string.Format(provider, resourceFormat, p1, p2); } internal static string Format(IFormatProvider provider, string resourceFormat, object p1, object p2, object p3) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1, p2, p3); } return string.Format(provider, resourceFormat, p1, p2, p3); } internal static string Format(IFormatProvider provider, string resourceFormat, params object[] args) { if (args != null) { if (UsingResourceKeys()) { return resourceFormat + ", " + string.Join(", ", args); } return string.Format(provider, resourceFormat, args); } return resourceFormat; } } internal static class ExceptionPolyfills { [SpecialName] public sealed class $E6188BA5B951F1F7AA9135E0EBB76F2B { [SpecialName] public static class $96F0261AC622664B8B003966835C0332 { } [ExtensionMarker("$96F0261AC622664B8B003966835C0332")] public static void ThrowIfNull([NotNull] object argument, [CallerArgumentExpression("argument")] string paramName = null) { throw null; } } [SpecialName] public sealed class $3F30F31B33543D5FB8E174FB4FD780B9 { [SpecialName] public static class $1F10CFA08738E6D8AF61CBECC6763DBC { } [ExtensionMarker("$1F10CFA08738E6D8AF61CBECC6763DBC")] public static void ThrowIf([DoesNotReturnIf(true)] bool condition, object instance) { throw null; } [ExtensionMarker("$1F10CFA08738E6D8AF61CBECC6763DBC")] public static void ThrowIf([DoesNotReturnIf(true)] bool condition, Type type) { throw null; } } public static void ThrowIfNull([NotNull] object argument, [CallerArgumentExpression("argument")] string paramName = null) { if (argument == null) { ThrowArgumentNullException(paramName); } } [DoesNotReturn] private static void ThrowArgumentNullException(string paramName) { throw new ArgumentNullException(paramName); } public static void ThrowIf([DoesNotReturnIf(true)] bool condition, object instance) { if (condition) { ThrowObjectDisposedException(instance); } } public static void ThrowIf([DoesNotReturnIf(true)] bool condition, Type type) { if (condition) { ThrowObjectDisposedException(type); } } [DoesNotReturn] private static void ThrowObjectDisposedException(object instance) { throw new ObjectDisposedException(instance?.GetType().FullName); } [DoesNotReturn] private static void ThrowObjectDisposedException(Type type) { throw new ObjectDisposedException(type?.FullName); } } } namespace System.Diagnostics.CodeAnalysis { [AttributeUsage(AttributeTargets.Method | AttributeTargets.Property, Inherited = false, AllowMultiple = true)] internal sealed class MemberNotNullAttribute : Attribute { public string[] Members { get; } public MemberNotNullAttribute(string member) { Members = new string[1] { member }; } public MemberNotNullAttribute(params string[] members) { Members = members; } } [AttributeUsage(AttributeTargets.Method | AttributeTargets.Property, Inherited = false, AllowMultiple = true)] internal sealed class MemberNotNullWhenAttribute : Attribute { public bool ReturnValue { get; } public string[] Members { get; } public MemberNotNullWhenAttribute(bool returnValue, string member) { ReturnValue = returnValue; Members = new string[1] { member }; } public MemberNotNullWhenAttribute(bool returnValue, params string[] members) { ReturnValue = returnValue; Members = members; } } } namespace System.Runtime.CompilerServices { [AttributeUsage(AttributeTargets.Parameter, AllowMultiple = false, Inherited = false)] internal sealed class CallerArgumentExpressionAttribute : Attribute { public string ParameterName { get; } public CallerArgumentExpressionAttribute(string parameterName) { ParameterName = parameterName; } } } namespace System.Runtime.InteropServices { [AttributeUsage(AttributeTargets.Method, AllowMultiple = false, Inherited = false)] internal sealed class LibraryImportAttribute : Attribute { public string LibraryName { get; } public string EntryPoint { get; set; } public StringMarshalling StringMarshalling { get; set; } public Type StringMarshallingCustomType { get; set; } public bool SetLastError { get; set; } public LibraryImportAttribute(string libraryName) { LibraryName = libraryName; } } internal enum StringMarshalling { Custom, Utf8, Utf16 } } namespace System.Collections.Generic { [DebuggerDisplay("Count = {_size}")] internal sealed class Deque { [CompilerGenerated] private sealed class d__13 : IEnumerator, IEnumerator, IDisposable { private int <>1__state; private T <>2__current; public Deque <>4__this; private int 5__2; private int 5__3; T IEnumerator.Current { [DebuggerHidden] get { return <>2__current; } } object IEnumerator.Current { [DebuggerHidden] get { return <>2__current; } } [DebuggerHidden] public d__13(int <>1__state) { this.<>1__state = <>1__state; } [DebuggerHidden] void IDisposable.Dispose() { <>1__state = -2; } private bool MoveNext() { int num = <>1__state; Deque deque = <>4__this; switch (num) { default: return false; case 0: <>1__state = -1; 5__2 = deque._head; 5__3 = deque._size; break; case 1: <>1__state = -1; 5__2 = (5__2 + 1) % deque._array.Length; break; } if (5__3-- > 0) { <>2__current = deque._array[5__2]; <>1__state = 1; return true; } return false; } bool IEnumerator.MoveNext() { //ILSpy generated this explicit interface implementation from .override directive in MoveNext return this.MoveNext(); } [DebuggerHidden] void IEnumerator.Reset() { throw new NotSupportedException(); } } private T[] _array = Array.Empty(); private int _head; private int _tail; private int _size; public int Count => _size; public bool IsEmpty => _size == 0; public void EnqueueTail(T item) { if (_size == _array.Length) { Grow(); } _array[_tail] = item; if (++_tail == _array.Length) { _tail = 0; } _size++; } public T DequeueHead() { T result = _array[_head]; _array[_head] = default(T); if (++_head == _array.Length) { _head = 0; } _size--; return result; } public T PeekHead() { return _array[_head]; } public T PeekTail() { int num = _tail - 1; if (num == -1) { num = _array.Length - 1; } return _array[num]; } public T DequeueTail() { if (--_tail == -1) { _tail = _array.Length - 1; } T result = _array[_tail]; _array[_tail] = default(T); _size--; return result; } [IteratorStateMachine(typeof(Deque<>.d__13))] public IEnumerator GetEnumerator() { //yield-return decompiler failed: Unexpected instruction in Iterator.Dispose() return new d__13(0) { <>4__this = this }; } private void Grow() { int num = (int)((long)_array.Length * 2L); if (num < _array.Length + 4) { num = _array.Length + 4; } T[] array = new T[num]; if (_head == 0) { Array.Copy(_array, array, _size); } else { Array.Copy(_array, _head, array, 0, _array.Length - _head); Array.Copy(_array, 0, array, _array.Length - _head, _tail); } _array = array; _head = 0; _tail = _size; } } } namespace System.Collections.Concurrent { internal interface IProducerConsumerQueue : IEnumerable, IEnumerable { bool IsEmpty { get; } int Count { get; } void Enqueue(T item); bool TryDequeue([MaybeNullWhen(false)] out T result); int GetCountSafe(object syncObj); } [DebuggerDisplay("Count = {Count}")] internal sealed class MultiProducerMultiConsumerQueue : ConcurrentQueue, IProducerConsumerQueue, IEnumerable, IEnumerable { bool IProducerConsumerQueue.IsEmpty => base.IsEmpty; int IProducerConsumerQueue.Count => base.Count; void IProducerConsumerQueue.Enqueue(T item) { Enqueue(item); } bool IProducerConsumerQueue.TryDequeue([MaybeNullWhen(false)] out T result) { return TryDequeue(out result); } int IProducerConsumerQueue.GetCountSafe(object syncObj) { return base.Count; } } [DebuggerDisplay("Count = {Count}")] [DebuggerTypeProxy(typeof(SingleProducerSingleConsumerQueue<>.SingleProducerSingleConsumerQueue_DebugView))] internal sealed class SingleProducerSingleConsumerQueue : IProducerConsumerQueue, IEnumerable, IEnumerable { [StructLayout(LayoutKind.Sequential)] private sealed class Segment { internal Segment _next; internal readonly T[] _array; internal SegmentState _state; internal Segment(int size) { _array = new T[size]; } } private struct SegmentState { internal Internal.PaddingFor32 _pad0; internal volatile int _first; internal int _lastCopy; internal Internal.PaddingFor32 _pad1; internal int _firstCopy; internal volatile int _last; internal Internal.PaddingFor32 _pad2; } private sealed class SingleProducerSingleConsumerQueue_DebugView { private readonly SingleProducerSingleConsumerQueue _queue; [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)] public T[] Items => new List(_queue).ToArray(); public SingleProducerSingleConsumerQueue_DebugView(SingleProducerSingleConsumerQueue queue) { _queue = queue; } } [CompilerGenerated] private sealed class d__15 : IEnumerator, IEnumerator, IDisposable { private int <>1__state; private T <>2__current; public SingleProducerSingleConsumerQueue <>4__this; private Segment 5__2; private int 5__3; T IEnumerator.Current { [DebuggerHidden] get { return <>2__current; } } object IEnumerator.Current { [DebuggerHidden] get { return <>2__current; } } [DebuggerHidden] public d__15(int <>1__state) { this.<>1__state = <>1__state; } [DebuggerHidden] void IDisposable.Dispose() { 5__2 = null; <>1__state = -2; } private bool MoveNext() { int num = <>1__state; SingleProducerSingleConsumerQueue singleProducerSingleConsumerQueue = <>4__this; if (num != 0) { if (num != 1) { return false; } <>1__state = -1; 5__3 = (5__3 + 1) & (5__2._array.Length - 1); goto IL_0095; } <>1__state = -1; 5__2 = singleProducerSingleConsumerQueue._head; goto IL_00c0; IL_0095: if (5__3 != 5__2._state._last) { <>2__current = 5__2._array[5__3]; <>1__state = 1; return true; } 5__2 = 5__2._next; goto IL_00c0; IL_00c0: if (5__2 != null) { 5__3 = 5__2._state._first; goto IL_0095; } 5__2 = null; return false; } bool IEnumerator.MoveNext() { //ILSpy generated this explicit interface implementation from .override directive in MoveNext return this.MoveNext(); } [DebuggerHidden] void IEnumerator.Reset() { throw new NotSupportedException(); } } private const int InitialSegmentSize = 32; private const int MaxSegmentSize = 16777216; private volatile Segment _head; private volatile Segment _tail; public bool IsEmpty { get { Segment head = _head; if (head._state._first != head._state._lastCopy) { return false; } if (head._state._first != head._state._last) { return false; } return head._next == null; } } public int Count { get { int num = 0; for (Segment segment = _head; segment != null; segment = segment._next) { int num2 = segment._array.Length; int first; int last; do { first = segment._state._first; last = segment._state._last; } while (first != segment._state._first); num += (last - first) & (num2 - 1); } return num; } } public SingleProducerSingleConsumerQueue() { _head = (_tail = new Segment(32)); } public void Enqueue(T item) { Segment segment = _tail; T[] array = segment._array; int last = segment._state._last; int num = (last + 1) & (array.Length - 1); if (num != segment._state._firstCopy) { array[last] = item; segment._state._last = num; } else { EnqueueSlow(item, ref segment); } } private void EnqueueSlow(T item, ref Segment segment) { if (segment._state._firstCopy != segment._state._first) { segment._state._firstCopy = segment._state._first; Enqueue(item); return; } Segment segment2 = new Segment(Math.Min(_tail._array.Length * 2, 16777216)); segment2._array[0] = item; segment2._state._last = 1; segment2._state._lastCopy = 1; try { } finally { Volatile.Write(ref _tail._next, segment2); _tail = segment2; } } public bool TryDequeue([MaybeNullWhen(false)] out T result) { Segment head = _head; T[] array = head._array; int first = head._state._first; if (first != head._state._lastCopy) { result = array[first]; array[first] = default(T); head._state._first = (first + 1) & (array.Length - 1); return true; } return TryDequeueSlow(head, array, peek: false, out result); } public bool TryPeek([MaybeNullWhen(false)] out T result) { Segment head = _head; T[] array = head._array; int first = head._state._first; if (first != head._state._lastCopy) { result = array[first]; return true; } return TryDequeueSlow(head, array, peek: true, out result); } private bool TryDequeueSlow(Segment segment, T[] array, bool peek, [MaybeNullWhen(false)] out T result) { if (segment._state._last != segment._state._lastCopy) { segment._state._lastCopy = segment._state._last; if (!peek) { return TryDequeue(out result); } return TryPeek(out result); } if (segment._next != null && segment._state._first == segment._state._last) { segment = segment._next; array = segment._array; _head = segment; } int first = segment._state._first; if (first == segment._state._last) { result = default(T); return false; } result = array[first]; if (!peek) { array[first] = default(T); segment._state._first = (first + 1) & (segment._array.Length - 1); segment._state._lastCopy = segment._state._last; } return true; } public bool TryDequeueIf(Predicate predicate, [MaybeNullWhen(false)] out T result) { Segment head = _head; T[] array = head._array; int first = head._state._first; if (first != head._state._lastCopy) { result = array[first]; if (predicate == null || predicate(result)) { array[first] = default(T); head._state._first = (first + 1) & (array.Length - 1); return true; } result = default(T); return false; } return TryDequeueIfSlow(predicate, head, array, out result); } private bool TryDequeueIfSlow(Predicate predicate, Segment segment, T[] array, [MaybeNullWhen(false)] out T result) { if (segment._state._last != segment._state._lastCopy) { segment._state._lastCopy = segment._state._last; return TryDequeueIf(predicate, out result); } if (segment._next != null && segment._state._first == segment._state._last) { segment = segment._next; array = segment._array; _head = segment; } int first = segment._state._first; if (first == segment._state._last) { result = default(T); return false; } result = array[first]; if (predicate == null || predicate(result)) { array[first] = default(T); segment._state._first = (first + 1) & (segment._array.Length - 1); segment._state._lastCopy = segment._state._last; return true; } result = default(T); return false; } public void Clear() { T result; while (TryDequeue(out result)) { } } [IteratorStateMachine(typeof(SingleProducerSingleConsumerQueue<>.d__15))] public IEnumerator GetEnumerator() { //yield-return decompiler failed: Unexpected instruction in Iterator.Dispose() return new d__15(0) { <>4__this = this }; } IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } int IProducerConsumerQueue.GetCountSafe(object syncObj) { lock (syncObj) { return Count; } } } } namespace System.Threading.Channels { internal abstract class AsyncOperation { private protected sealed class CapturedSchedulerAndExecutionContext { internal readonly object _scheduler; internal readonly ExecutionContext _executionContext; public CapturedSchedulerAndExecutionContext(object scheduler, ExecutionContext executionContext) { _scheduler = scheduler; _executionContext = executionContext; } } protected static readonly Action s_availableSentinel = AvailableSentinel; protected static readonly Action s_completedSentinel = CompletedSentinel; private readonly CancellationTokenRegistration _cancellationRegistration; private readonly Action _cancellationCallback; private protected readonly bool _pooled; private volatile int _completionReserved; private protected ExceptionDispatchInfo _error; private protected Action _continuation; private protected object _continuationState; private protected object _capturedContext; private protected short _currentId; public bool RunContinuationsAsynchronously { get; } private CancellationToken CancellationToken { get; } internal bool IsCompleted => (object)_continuation == s_completedSentinel; private static void AvailableSentinel(object s) { } private static void CompletedSentinel(object s) { } protected static void ThrowIncompleteOperationException() { throw new InvalidOperationException(System.SR.InvalidOperation_IncompleteAsyncOperation); } protected static void ThrowMultipleContinuations() { throw new InvalidOperationException(System.SR.InvalidOperation_MultipleContinuations); } protected static void ThrowIncorrectCurrentIdException() { throw new InvalidOperationException(System.SR.InvalidOperation_IncorrectToken); } protected AsyncOperation(bool runContinuationsAsynchronously, CancellationToken cancellationToken, bool pooled, Action cancellationCallback) { _continuation = (pooled ? s_availableSentinel : null); _pooled = pooled; RunContinuationsAsynchronously = runContinuationsAsynchronously; if (cancellationToken.CanBeCanceled) { _cancellationCallback = cancellationCallback; CancellationToken = cancellationToken; _cancellationRegistration = cancellationToken.Register(delegate(object s) { AsyncOperation asyncOperation = (AsyncOperation)s; asyncOperation._cancellationCallback(asyncOperation, asyncOperation.CancellationToken); }, this); } } public bool TrySetException(Exception exception) { if (TryReserveCompletionIfCancelable()) { _error = ExceptionDispatchInfo.Capture(exception); SignalCompletion(); return true; } return false; } public bool TrySetCanceled(CancellationToken cancellationToken = default(CancellationToken)) { if (TryReserveCompletionIfCancelable()) { _error = ExceptionDispatchInfo.Capture(new OperationCanceledException(cancellationToken)); SignalCompletion(); return true; } return false; } public bool TryReserveCompletionIfCancelable() { if (CancellationToken.CanBeCanceled) { return Interlocked.Exchange(ref _completionReserved, 1) == 0; } return true; } private protected void SignalCompletion() { Unregister(_cancellationRegistration); if (_continuation == null && Interlocked.CompareExchange(ref _continuation, s_completedSentinel, null) == null) { return; } object capturedContext = _capturedContext; if ((capturedContext == null || capturedContext is ExecutionContext) ? true : false) { if (RunContinuationsAsynchronously) { UnsafeQueueSetCompletionAndInvokeContinuation(); return; } } else { SynchronizationContext synchronizationContext = (capturedContext as SynchronizationContext) ?? ((capturedContext as CapturedSchedulerAndExecutionContext)?._scheduler as SynchronizationContext); if (synchronizationContext != null) { if (RunContinuationsAsynchronously || synchronizationContext != SynchronizationContext.Current) { synchronizationContext.Post(delegate(object s) { ((AsyncOperation)s).SetCompletionAndInvokeContinuation(); }, this); return; } } else { TaskScheduler taskScheduler = (capturedContext as TaskScheduler) ?? ((capturedContext as CapturedSchedulerAndExecutionContext)?._scheduler as TaskScheduler); if (RunContinuationsAsynchronously || taskScheduler != TaskScheduler.Current) { Task.Factory.StartNew(delegate(object s) { ((AsyncOperation)s).SetCompletionAndInvokeContinuation(); }, this, CancellationToken.None, TaskCreationOptions.DenyChildAttach, taskScheduler); return; } } } SetCompletionAndInvokeContinuation(); } private void SetCompletionAndInvokeContinuation() { object capturedContext = _capturedContext; ExecutionContext executionContext = ((capturedContext == null) ? null : ((capturedContext as ExecutionContext) ?? (capturedContext as CapturedSchedulerAndExecutionContext)?._executionContext)); if (executionContext == null) { Action continuation = _continuation; _continuation = s_completedSentinel; continuation(_continuationState); return; } ExecutionContext.Run(executionContext, delegate(object s) { AsyncOperation asyncOperation = (AsyncOperation)s; Action continuation2 = asyncOperation._continuation; asyncOperation._continuation = s_completedSentinel; continuation2(asyncOperation._continuationState); }, this); } public void OnCompleted(Action continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) { if (_currentId != token) { ThrowIncorrectCurrentIdException(); } if (_continuationState != null) { ThrowMultipleContinuations(); } _continuationState = state; if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0) { _capturedContext = ExecutionContext.Capture(); } SynchronizationContext synchronizationContext = null; TaskScheduler taskScheduler = null; if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0) { synchronizationContext = SynchronizationContext.Current; if (synchronizationContext != null && synchronizationContext.GetType() != typeof(SynchronizationContext)) { _capturedContext = ((_capturedContext == null) ? ((object)synchronizationContext) : ((object)new CapturedSchedulerAndExecutionContext(synchronizationContext, (ExecutionContext)_capturedContext))); } else { synchronizationContext = null; taskScheduler = TaskScheduler.Current; if (taskScheduler != TaskScheduler.Default) { _capturedContext = ((_capturedContext == null) ? ((object)taskScheduler) : ((object)new CapturedSchedulerAndExecutionContext(taskScheduler, (ExecutionContext)_capturedContext))); } else { taskScheduler = null; } } } Action action = Interlocked.CompareExchange(ref _continuation, continuation, null); if (action == null) { return; } if ((object)action != s_completedSentinel) { ThrowMultipleContinuations(); } if (_capturedContext == null) { ChannelUtilities.UnsafeQueueUserWorkItem(continuation, state); } else if (synchronizationContext != null) { synchronizationContext.Post(delegate(object s) { KeyValuePair, object> keyValuePair = (KeyValuePair, object>)s; keyValuePair.Key(keyValuePair.Value); }, new KeyValuePair, object>(continuation, state)); } else if (taskScheduler != null) { Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, taskScheduler); } else { ChannelUtilities.QueueUserWorkItem(continuation, state); } } private void UnsafeQueueSetCompletionAndInvokeContinuation() { ThreadPool.UnsafeQueueUserWorkItem(delegate(object s) { ((AsyncOperation)s).SetCompletionAndInvokeContinuation(); }, this); } private static void Unregister(CancellationTokenRegistration registration) { registration.Dispose(); } } internal abstract class AsyncOperation : AsyncOperation, IValueTaskSource { public TSelf Next { get; set; } public TSelf Previous { get; set; } public ValueTask ValueTask => new ValueTask(this, _currentId); protected AsyncOperation(bool runContinuationsAsynchronously, CancellationToken cancellationToken = default(CancellationToken), bool pooled = false, Action cancellationCallback = null) : base(runContinuationsAsynchronously, cancellationToken, pooled, cancellationCallback) { } public ValueTaskSourceStatus GetStatus(short token) { if (_currentId != token) { AsyncOperation.ThrowIncorrectCurrentIdException(); } if (base.IsCompleted) { if (_error != null) { if (!(_error.SourceException is OperationCanceledException)) { return ValueTaskSourceStatus.Faulted; } return ValueTaskSourceStatus.Canceled; } return ValueTaskSourceStatus.Succeeded; } return ValueTaskSourceStatus.Pending; } void IValueTaskSource.GetResult(short token) { if (_currentId != token) { AsyncOperation.ThrowIncorrectCurrentIdException(); } if (!base.IsCompleted) { AsyncOperation.ThrowIncompleteOperationException(); } ExceptionDispatchInfo error = _error; _currentId++; if (_pooled) { Volatile.Write(ref _continuation, AsyncOperation.s_availableSentinel); } error?.Throw(); } } internal abstract class AsyncOperation : AsyncOperation, IValueTaskSource where TSelf : AsyncOperation { private TResult _result; public ValueTask ValueTaskOfT => new ValueTask(this, _currentId); public AsyncOperation(bool runContinuationsAsynchronously, CancellationToken cancellationToken = default(CancellationToken), bool pooled = false, Action cancellationCallback = null) : base(runContinuationsAsynchronously, cancellationToken, pooled, cancellationCallback) { } public TResult GetResult(short token) { if (_currentId != token) { AsyncOperation.ThrowIncorrectCurrentIdException(); } if (!base.IsCompleted) { AsyncOperation.ThrowIncompleteOperationException(); } ExceptionDispatchInfo error = _error; TResult result = _result; _currentId++; if (_pooled) { Volatile.Write(ref _continuation, AsyncOperation.s_availableSentinel); } error?.Throw(); return result; } public bool TryOwnAndReset() { if ((object)Interlocked.CompareExchange(ref _continuation, null, AsyncOperation.s_availableSentinel) == AsyncOperation.s_availableSentinel) { _continuationState = null; _result = default(TResult); _error = null; _capturedContext = null; return true; } return false; } public bool TrySetResult(TResult result) { if (TryReserveCompletionIfCancelable()) { DangerousSetResult(result); return true; } return false; } public void DangerousSetResult(TResult result) { _result = result; SignalCompletion(); } } internal sealed class BlockedReadAsyncOperation : AsyncOperation, TResult> { public BlockedReadAsyncOperation(bool runContinuationsAsynchronously, CancellationToken cancellationToken = default(CancellationToken), bool pooled = false, Action cancellationCallback = null) : base(runContinuationsAsynchronously, cancellationToken, pooled, cancellationCallback) { } } internal sealed class BlockedWriteAsyncOperation : AsyncOperation, VoidResult> { public T Item { get; set; } public BlockedWriteAsyncOperation(bool runContinuationsAsynchronously, CancellationToken cancellationToken = default(CancellationToken), bool pooled = false, Action cancellationCallback = null) : base(runContinuationsAsynchronously, cancellationToken, pooled, cancellationCallback) { } } internal sealed class WaitingReadAsyncOperation : AsyncOperation { public WaitingReadAsyncOperation(bool runContinuationsAsynchronously, CancellationToken cancellationToken = default(CancellationToken), bool pooled = false, Action cancellationCallback = null) : base(runContinuationsAsynchronously, cancellationToken, pooled, cancellationCallback) { } } internal sealed class WaitingWriteAsyncOperation : AsyncOperation { public WaitingWriteAsyncOperation(bool runContinuationsAsynchronously, CancellationToken cancellationToken = default(CancellationToken), bool pooled = false, Action cancellationCallback = null) : base(runContinuationsAsynchronously, cancellationToken, pooled, cancellationCallback) { } } [DebuggerDisplay("Items = {ItemsCountForDebugger}, Capacity = {_bufferedCapacity}, Mode = {_mode}, Closed = {ChannelIsClosedForDebugger}")] [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] internal sealed class BoundedChannel : Channel, IDebugEnumerable { [DebuggerDisplay("Items = {ItemsCountForDebugger}")] [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] private sealed class BoundedChannelReader : ChannelReader, IDebugEnumerable { internal readonly BoundedChannel _parent; private readonly BlockedReadAsyncOperation _readerSingleton; private readonly WaitingReadAsyncOperation _waiterSingleton; public override Task Completion => _parent._completion.Task; public override bool CanCount => true; public override bool CanPeek => true; public override int Count { get { BoundedChannel parent = _parent; lock (parent.SyncObj) { return parent._items.Count; } } } private int ItemsCountForDebugger => _parent._items.Count; internal BoundedChannelReader(BoundedChannel parent) { _parent = parent; _readerSingleton = new BlockedReadAsyncOperation(parent._runContinuationsAsynchronously, default(CancellationToken), pooled: true); _waiterSingleton = new WaitingReadAsyncOperation(parent._runContinuationsAsynchronously, default(CancellationToken), pooled: true); } public override bool TryRead([MaybeNullWhen(false)] out T item) { BoundedChannel parent = _parent; lock (parent.SyncObj) { if (!parent._items.IsEmpty) { item = DequeueItemAndPostProcess(); return true; } } item = default(T); return false; } public override bool TryPeek([MaybeNullWhen(false)] out T item) { BoundedChannel parent = _parent; lock (parent.SyncObj) { if (!parent._items.IsEmpty) { item = parent._items.PeekHead(); return true; } } item = default(T); return false; } public override ValueTask ReadAsync(CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { return new ValueTask(Task.FromCanceled(cancellationToken)); } BoundedChannel parent = _parent; lock (parent.SyncObj) { if (!parent._items.IsEmpty) { return new ValueTask(DequeueItemAndPostProcess()); } if (parent._doneWriting != null) { return ChannelUtilities.GetInvalidCompletionValueTask(parent._doneWriting); } if (!cancellationToken.CanBeCanceled) { BlockedReadAsyncOperation readerSingleton = _readerSingleton; if (readerSingleton.TryOwnAndReset()) { ChannelUtilities.Enqueue(ref parent._blockedReadersHead, readerSingleton); return readerSingleton.ValueTaskOfT; } } BlockedReadAsyncOperation blockedReadAsyncOperation = new BlockedReadAsyncOperation(parent._runContinuationsAsynchronously, cancellationToken, pooled: false, _parent.CancellationCallbackDelegate); ChannelUtilities.Enqueue(ref parent._blockedReadersHead, blockedReadAsyncOperation); return blockedReadAsyncOperation.ValueTaskOfT; } } public override ValueTask WaitToReadAsync(CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { return new ValueTask(Task.FromCanceled(cancellationToken)); } BoundedChannel parent = _parent; lock (parent.SyncObj) { if (!parent._items.IsEmpty) { return new ValueTask(result: true); } if (parent._doneWriting != null) { return (parent._doneWriting != ChannelUtilities.s_doneWritingSentinel) ? new ValueTask(Task.FromException(parent._doneWriting)) : default(ValueTask); } if (!cancellationToken.CanBeCanceled) { WaitingReadAsyncOperation waiterSingleton = _waiterSingleton; if (waiterSingleton.TryOwnAndReset()) { ChannelUtilities.Enqueue(ref parent._waitingReadersHead, waiterSingleton); return waiterSingleton.ValueTaskOfT; } } WaitingReadAsyncOperation waitingReadAsyncOperation = new WaitingReadAsyncOperation(parent._runContinuationsAsynchronously, cancellationToken, pooled: false, _parent.CancellationCallbackDelegate); ChannelUtilities.Enqueue(ref parent._waitingReadersHead, waitingReadAsyncOperation); return waitingReadAsyncOperation.ValueTaskOfT; } } private T DequeueItemAndPostProcess() { BoundedChannel parent = _parent; T result = parent._items.DequeueHead(); if (parent._doneWriting != null) { if (parent._items.IsEmpty) { ChannelUtilities.Complete(parent._completion, parent._doneWriting); } } else { BlockedWriteAsyncOperation op; while (ChannelUtilities.TryDequeue(ref parent._blockedWritersHead, out op)) { if (op.TrySetResult(default(VoidResult))) { parent._items.EnqueueTail(op.Item); return result; } } ChannelUtilities.SetOperations(ref parent._waitingWritersHead, result: true); } return result; } IEnumerator IDebugEnumerable.GetEnumerator() { return _parent._items.GetEnumerator(); } } [DebuggerDisplay("Items = {ItemsCountForDebugger}, Capacity = {CapacityForDebugger}")] [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] private sealed class BoundedChannelWriter : ChannelWriter, IDebugEnumerable { internal readonly BoundedChannel _parent; private readonly BlockedWriteAsyncOperation _writerSingleton; private readonly WaitingWriteAsyncOperation _waiterSingleton; private int ItemsCountForDebugger => _parent._items.Count; private int CapacityForDebugger => _parent._bufferedCapacity; internal BoundedChannelWriter(BoundedChannel parent) { _parent = parent; _writerSingleton = new BlockedWriteAsyncOperation(runContinuationsAsynchronously: true, default(CancellationToken), pooled: true); _waiterSingleton = new WaitingWriteAsyncOperation(runContinuationsAsynchronously: true, default(CancellationToken), pooled: true); } public override bool TryComplete(Exception error) { BoundedChannel parent = _parent; bool isEmpty; BlockedReadAsyncOperation blockedReadersHead; BlockedWriteAsyncOperation blockedWritersHead; WaitingReadAsyncOperation waitingReadersHead; WaitingWriteAsyncOperation waitingWritersHead; lock (parent.SyncObj) { if (parent._doneWriting != null) { return false; } parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel; isEmpty = parent._items.IsEmpty; blockedReadersHead = parent._blockedReadersHead; blockedWritersHead = parent._blockedWritersHead; waitingReadersHead = parent._waitingReadersHead; waitingWritersHead = parent._waitingWritersHead; parent._blockedReadersHead = null; parent._blockedWritersHead = null; parent._waitingReadersHead = null; parent._waitingWritersHead = null; } if (isEmpty) { ChannelUtilities.Complete(parent._completion, error); } ChannelUtilities.FailOperations(blockedReadersHead, ChannelUtilities.CreateInvalidCompletionException(error)); ChannelUtilities.FailOperations(blockedWritersHead, ChannelUtilities.CreateInvalidCompletionException(error)); ChannelUtilities.SetOrFailOperations(waitingReadersHead, result: false, error); ChannelUtilities.SetOrFailOperations(waitingWritersHead, result: false, error); return true; } public override bool TryWrite(T item) { BlockedReadAsyncOperation blockedReadAsyncOperation = null; WaitingReadAsyncOperation waitingReadAsyncOperation = null; BoundedChannel parent = _parent; bool lockTaken = false; try { Monitor.Enter(parent.SyncObj, ref lockTaken); if (parent._doneWriting != null) { return false; } int count = parent._items.Count; if (count != 0) { if (count < parent._bufferedCapacity) { parent._items.EnqueueTail(item); return true; } if (parent._mode == BoundedChannelFullMode.Wait) { return false; } if (parent._mode == BoundedChannelFullMode.DropWrite) { Monitor.Exit(parent.SyncObj); lockTaken = false; parent._itemDropped?.Invoke(item); return true; } T obj = ((parent._mode == BoundedChannelFullMode.DropNewest) ? parent._items.DequeueTail() : parent._items.DequeueHead()); parent._items.EnqueueTail(item); Monitor.Exit(parent.SyncObj); lockTaken = false; parent._itemDropped?.Invoke(obj); return true; } blockedReadAsyncOperation = ChannelUtilities.TryDequeueAndReserveCompletionIfCancelable(ref parent._blockedReadersHead); if (blockedReadAsyncOperation == null) { parent._items.EnqueueTail(item); waitingReadAsyncOperation = ChannelUtilities.TryReserveCompletionIfCancelable(ref parent._waitingReadersHead); if (waitingReadAsyncOperation == null) { return true; } } } finally { if (lockTaken) { Monitor.Exit(parent.SyncObj); } } if (blockedReadAsyncOperation != null) { blockedReadAsyncOperation.DangerousSetResult(item); } else { ChannelUtilities.DangerousSetOperations(waitingReadAsyncOperation, result: true); } return true; } public override ValueTask WaitToWriteAsync(CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { return new ValueTask(Task.FromCanceled(cancellationToken)); } BoundedChannel parent = _parent; lock (parent.SyncObj) { if (parent._doneWriting != null) { return (parent._doneWriting != ChannelUtilities.s_doneWritingSentinel) ? new ValueTask(Task.FromException(parent._doneWriting)) : default(ValueTask); } if (parent._items.Count < parent._bufferedCapacity || parent._mode != 0) { return new ValueTask(result: true); } if (!cancellationToken.CanBeCanceled) { WaitingWriteAsyncOperation waiterSingleton = _waiterSingleton; if (waiterSingleton.TryOwnAndReset()) { ChannelUtilities.Enqueue(ref parent._waitingWritersHead, waiterSingleton); return waiterSingleton.ValueTaskOfT; } } WaitingWriteAsyncOperation waitingWriteAsyncOperation = new WaitingWriteAsyncOperation(runContinuationsAsynchronously: true, cancellationToken, pooled: false, _parent.CancellationCallbackDelegate); ChannelUtilities.Enqueue(ref parent._waitingWritersHead, waitingWriteAsyncOperation); return waitingWriteAsyncOperation.ValueTaskOfT; } } public override ValueTask WriteAsync(T item, CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { return new ValueTask(Task.FromCanceled(cancellationToken)); } BlockedReadAsyncOperation blockedReadAsyncOperation = null; WaitingReadAsyncOperation waitingReadAsyncOperation = null; BoundedChannel parent = _parent; bool lockTaken = false; try { Monitor.Enter(parent.SyncObj, ref lockTaken); if (parent._doneWriting != null) { return new ValueTask(Task.FromException(ChannelUtilities.CreateInvalidCompletionException(parent._doneWriting))); } int count = parent._items.Count; if (count != 0) { if (count < parent._bufferedCapacity) { parent._items.EnqueueTail(item); return default(ValueTask); } if (parent._mode == BoundedChannelFullMode.Wait) { if (!cancellationToken.CanBeCanceled) { BlockedWriteAsyncOperation writerSingleton = _writerSingleton; if (writerSingleton.TryOwnAndReset()) { writerSingleton.Item = item; ChannelUtilities.Enqueue(ref parent._blockedWritersHead, writerSingleton); return writerSingleton.ValueTask; } } BlockedWriteAsyncOperation blockedWriteAsyncOperation = new BlockedWriteAsyncOperation(runContinuationsAsynchronously: true, cancellationToken, pooled: false, _parent.CancellationCallbackDelegate) { Item = item }; ChannelUtilities.Enqueue(ref parent._blockedWritersHead, blockedWriteAsyncOperation); return blockedWriteAsyncOperation.ValueTask; } if (parent._mode == BoundedChannelFullMode.DropWrite) { Monitor.Exit(parent.SyncObj); lockTaken = false; parent._itemDropped?.Invoke(item); return default(ValueTask); } T obj = ((parent._mode == BoundedChannelFullMode.DropNewest) ? parent._items.DequeueTail() : parent._items.DequeueHead()); parent._items.EnqueueTail(item); Monitor.Exit(parent.SyncObj); lockTaken = false; parent._itemDropped?.Invoke(obj); return default(ValueTask); } blockedReadAsyncOperation = ChannelUtilities.TryDequeueAndReserveCompletionIfCancelable(ref parent._blockedReadersHead); if (blockedReadAsyncOperation == null) { parent._items.EnqueueTail(item); waitingReadAsyncOperation = ChannelUtilities.TryReserveCompletionIfCancelable(ref parent._waitingReadersHead); if (waitingReadAsyncOperation == null) { return default(ValueTask); } } } finally { if (lockTaken) { Monitor.Exit(parent.SyncObj); } } if (blockedReadAsyncOperation != null) { blockedReadAsyncOperation.DangerousSetResult(item); } else { ChannelUtilities.DangerousSetOperations(waitingReadAsyncOperation, result: true); } return default(ValueTask); } IEnumerator IDebugEnumerable.GetEnumerator() { return _parent._items.GetEnumerator(); } } private readonly BoundedChannelFullMode _mode; private readonly Action _itemDropped; private readonly TaskCompletionSource _completion; private readonly int _bufferedCapacity; private readonly Deque _items = new Deque(); private BlockedReadAsyncOperation _blockedReadersHead; private BlockedWriteAsyncOperation _blockedWritersHead; private WaitingReadAsyncOperation _waitingReadersHead; private WaitingWriteAsyncOperation _waitingWritersHead; private readonly bool _runContinuationsAsynchronously; private Exception _doneWriting; private object SyncObj => _items; private Action CancellationCallbackDelegate => delegate(object state, CancellationToken cancellationToken) { AsyncOperation asyncOperation = (AsyncOperation)state; if (asyncOperation.TrySetCanceled(cancellationToken)) { ChannelUtilities.UnsafeQueueUserWorkItem(delegate(KeyValuePair, AsyncOperation> state) { lock (state.Key.SyncObj) { AsyncOperation value = state.Value; if (!(value is BlockedReadAsyncOperation op)) { if (!(value is BlockedWriteAsyncOperation op2)) { if (!(value is WaitingReadAsyncOperation op3)) { if (value is WaitingWriteAsyncOperation op4) { ChannelUtilities.Remove(ref state.Key._waitingWritersHead, op4); } } else { ChannelUtilities.Remove(ref state.Key._waitingReadersHead, op3); } } else { ChannelUtilities.Remove(ref state.Key._blockedWritersHead, op2); } } else { ChannelUtilities.Remove(ref state.Key._blockedReadersHead, op); } } }, new KeyValuePair, AsyncOperation>(this, asyncOperation)); } }; private int ItemsCountForDebugger => _items.Count; private bool ChannelIsClosedForDebugger => _doneWriting != null; internal BoundedChannel(int bufferedCapacity, BoundedChannelFullMode mode, bool runContinuationsAsynchronously, Action itemDropped) { _bufferedCapacity = bufferedCapacity; _mode = mode; _runContinuationsAsynchronously = runContinuationsAsynchronously; _itemDropped = itemDropped; _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None); base.Reader = new BoundedChannelReader(this); base.Writer = new BoundedChannelWriter(this); } [Conditional("DEBUG")] private void AssertInvariants() { _ = _items.IsEmpty; _ = _items.Count; _ = _bufferedCapacity; _ = _blockedReadersHead; _ = _blockedWritersHead; _ = _completion.Task.IsCompleted; } IEnumerator IDebugEnumerable.GetEnumerator() { return _items.GetEnumerator(); } } public enum BoundedChannelFullMode { Wait, DropNewest, DropOldest, DropWrite } public static class Channel { public static Channel CreateUnbounded() { return new UnboundedChannel(runContinuationsAsynchronously: true); } public static Channel CreateUnbounded(UnboundedChannelOptions options) { ExceptionPolyfills.ThrowIfNull(options, "options"); if (options.SingleReader) { return new SingleConsumerUnboundedChannel(!options.AllowSynchronousContinuations); } return new UnboundedChannel(!options.AllowSynchronousContinuations); } public static Channel CreateBounded(int capacity) { if (capacity <= 0) { if (capacity != 0) { throw new ArgumentOutOfRangeException("capacity"); } return new RendezvousChannel(BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true, null); } return new BoundedChannel(capacity, BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true, null); } public static Channel CreateBounded(BoundedChannelOptions options) { return CreateBounded(options, null); } public static Channel CreateBounded(BoundedChannelOptions options, Action? itemDropped) { ExceptionPolyfills.ThrowIfNull(options, "options"); if (options.Capacity <= 0) { return new RendezvousChannel(options.FullMode, !options.AllowSynchronousContinuations, itemDropped); } return new BoundedChannel(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations, itemDropped); } } [Serializable] public class ChannelClosedException : InvalidOperationException { public ChannelClosedException() : base(System.SR.ChannelClosedException_DefaultMessage) { } public ChannelClosedException(string? message) : base(message ?? System.SR.ChannelClosedException_DefaultMessage) { } public ChannelClosedException(Exception? innerException) : base(System.SR.ChannelClosedException_DefaultMessage, innerException) { } public ChannelClosedException(string? message, Exception? innerException) : base(message ?? System.SR.ChannelClosedException_DefaultMessage, innerException) { } protected ChannelClosedException(SerializationInfo info, StreamingContext context) : base(info, context) { } } public abstract class ChannelOptions { public bool SingleWriter { get; set; } public bool SingleReader { get; set; } public bool AllowSynchronousContinuations { get; set; } } public sealed class BoundedChannelOptions : ChannelOptions { private int _capacity; private BoundedChannelFullMode _mode; public int Capacity { get { return _capacity; } set { if (value < 0) { throw new ArgumentOutOfRangeException("value"); } _capacity = value; } } public BoundedChannelFullMode FullMode { get { return _mode; } set { if ((uint)value <= 3u) { _mode = value; return; } throw new ArgumentOutOfRangeException("value"); } } public BoundedChannelOptions(int capacity) { if (capacity < 0) { throw new ArgumentOutOfRangeException("capacity"); } _capacity = capacity; } } public sealed class UnboundedChannelOptions : ChannelOptions { } public abstract class ChannelReader { public virtual Task Completion => ChannelUtilities.s_neverCompletingTask; public virtual bool CanCount => false; public virtual bool CanPeek => false; public virtual int Count { get { throw new NotSupportedException(); } } public abstract bool TryRead([MaybeNullWhen(false)] out T item); public virtual bool TryPeek([MaybeNullWhen(false)] out T item) { item = default(T); return false; } public abstract ValueTask WaitToReadAsync(CancellationToken cancellationToken = default(CancellationToken)); public virtual ValueTask ReadAsync(CancellationToken cancellationToken = default(CancellationToken)) { if (cancellationToken.IsCancellationRequested) { return new ValueTask(Task.FromCanceled(cancellationToken)); } try { if (TryRead(out var item)) { return new ValueTask(item); } } catch (Exception ex) when ((!(ex is ChannelClosedException) && !(ex is OperationCanceledException)) || 1 == 0) { return new ValueTask(Task.FromException(ex)); } return ReadAsyncCore(cancellationToken); async ValueTask ReadAsyncCore(CancellationToken ct) { T item2; do { if (!(await WaitToReadAsync(ct).ConfigureAwait(continueOnCapturedContext: false))) { throw new ChannelClosedException(); } } while (!TryRead(out item2)); return item2; } } public virtual async IAsyncEnumerable ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default(CancellationToken)) { while (await WaitToReadAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false)) { T item; while (TryRead(out item)) { yield return item; } } } } internal static class ChannelUtilities { internal static readonly Exception s_doneWritingSentinel = new Exception("s_doneWritingSentinel"); internal static readonly Task s_trueTask = Task.FromResult(result: true); internal static readonly Task s_falseTask = Task.FromResult(result: false); internal static readonly Task s_neverCompletingTask = new TaskCompletionSource().Task; internal static void Complete(TaskCompletionSource tcs, Exception error = null) { if (error is OperationCanceledException ex) { tcs.TrySetCanceled(ex.CancellationToken); } else if (error != null && error != s_doneWritingSentinel) { if (tcs.TrySetException(error)) { _ = tcs.Task.Exception; } } else { tcs.TrySetResult(); } } internal static ValueTask GetInvalidCompletionValueTask(Exception error) { return new ValueTask((error == s_doneWritingSentinel) ? Task.FromException(CreateInvalidCompletionException()) : ((error is OperationCanceledException ex) ? Task.FromCanceled(ex.CancellationToken.IsCancellationRequested ? ex.CancellationToken : new CancellationToken(canceled: true)) : Task.FromException(CreateInvalidCompletionException(error)))); } internal static TAsyncOp TryDequeueAndReserveCompletionIfCancelable(ref TAsyncOp head) where TAsyncOp : AsyncOperation { TAsyncOp op; while (TryDequeue(ref head, out op)) { if (op.TryReserveCompletionIfCancelable()) { return op; } } return null; } internal static bool TryDequeue(ref TAsyncOp head, [NotNullWhen(true)] out TAsyncOp op) where TAsyncOp : AsyncOperation { op = head; if (head == null) { return false; } if (head.Next == head) { head = null; } else { TAsyncOp previous = head.Previous; head = head.Next; head.Previous = previous; previous.Next = head; } TAsyncOp val = op; TAsyncOp next = (op.Previous = null); val.Next = next; return true; } internal static void Enqueue(ref TAsyncOp head, TAsyncOp op) where TAsyncOp : AsyncOperation { if (head == null) { TAsyncOp val2 = (op.Previous = op); TAsyncOp val4 = (op.Next = val2); head = val4; } else { TAsyncOp previous = head.Previous; op.Next = head; op.Previous = previous; previous.Next = op; head.Previous = op; } } internal static void Remove(ref TAsyncOp head, TAsyncOp op) where TAsyncOp : AsyncOperation { if (head == null || op.Next == null) { return; } if (op.Next == op) { head = null; } else { op.Previous.Next = op.Next; op.Next.Previous = op.Previous; if (head == op) { head = op.Next; } } TAsyncOp next = (op.Previous = null); op.Next = next; } internal static void SetOrFailOperations(TAsyncOp head, T result, Exception error = null) where TAsyncOp : AsyncOperation { if (error != null) { FailOperations(head, error); } else { SetOperations(ref head, result); } } internal static void SetOperations(ref TAsyncOp head, TResult result) where TAsyncOp : AsyncOperation { TAsyncOp val = head; if (val != null) { do { TAsyncOp next = val.Next; TAsyncOp val2 = val; TAsyncOp next2 = (val.Previous = null); val2.Next = next2; val.TrySetResult(result); val = next; } while (val != head); head = null; } } internal static void DangerousSetOperations(TAsyncOp head, TResult result) where TAsyncOp : AsyncOperation { TAsyncOp val = head; if (val != null) { do { TAsyncOp next = val.Next; TAsyncOp val2 = val; TAsyncOp next2 = (val.Previous = null); val2.Next = next2; val.DangerousSetResult(result); val = next; } while (val != head); } } internal static TAsyncOp TryReserveCompletionIfCancelable(ref TAsyncOp head) where TAsyncOp : AsyncOperation { TAsyncOp head2 = null; TAsyncOp val = head; if (val != null) { do { TAsyncOp next = val.Next; TAsyncOp val2 = val; TAsyncOp next2 = (val.Previous = null); val2.Next = next2; if (val.TryReserveCompletionIfCancelable()) { Enqueue(ref head2, val); } val = next; } while (val != head); head = null; } return head2; } internal static void FailOperations(TAsyncOp head, Exception error) where TAsyncOp : AsyncOperation { TAsyncOp val = head; if (val != null) { do { TAsyncOp next = val.Next; TAsyncOp val2 = val; TAsyncOp next2 = (val.Previous = null); val2.Next = next2; val.TrySetException(error); val = next; } while (val != head); } } [Conditional("DEBUG")] internal static void AssertAll(TAsyncOp head, Func condition, string message) where TAsyncOp : AsyncOperation { TAsyncOp val = head; if (val != null) { do { val = val.Next; } while (val != head); } } internal static long CountOperations(TAsyncOp head) where TAsyncOp : AsyncOperation { TAsyncOp val = head; long num = 0L; if (val != null) { do { num++; val = val.Next; } while (val != head); } return num; } internal static Exception CreateInvalidCompletionException(Exception inner = null) { if (!(inner is OperationCanceledException)) { if (inner == null || inner == s_doneWritingSentinel) { return new ChannelClosedException(); } return new ChannelClosedException(inner); } return inner; } internal static void UnsafeQueueUserWorkItem(Action action, object state) { QueueUserWorkItem(action, state); } internal static void UnsafeQueueUserWorkItem(Action action, TState state) { ThreadPool.UnsafeQueueUserWorkItem(delegate(object tuple) { Tuple, TState> tuple2 = (Tuple, TState>)tuple; tuple2.Item1(tuple2.Item2); }, Tuple.Create(action, state)); } internal static void QueueUserWorkItem(Action action, object state) { Task.Factory.StartNew(action, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); } } public abstract class ChannelWriter { public virtual bool TryComplete(Exception? error = null) { return false; } public abstract bool TryWrite(T item); public abstract ValueTask WaitToWriteAsync(CancellationToken cancellationToken = default(CancellationToken)); public virtual ValueTask WriteAsync(T item, CancellationToken cancellationToken = default(CancellationToken)) { try { return cancellationToken.IsCancellationRequested ? new ValueTask(Task.FromCanceled(cancellationToken)) : (TryWrite(item) ? default(ValueTask) : WriteAsyncCore(item, cancellationToken)); } catch (Exception exception) { return new ValueTask(Task.FromException(exception)); } } private async ValueTask WriteAsyncCore(T innerItem, CancellationToken ct) { while (await WaitToWriteAsync(ct).ConfigureAwait(continueOnCapturedContext: false)) { if (TryWrite(innerItem)) { return; } } throw ChannelUtilities.CreateInvalidCompletionException(); } public void Complete(Exception? error = null) { if (!TryComplete(error)) { throw ChannelUtilities.CreateInvalidCompletionException(); } } } public abstract class Channel : Channel { } public abstract class Channel { public ChannelReader Reader { get; protected set; } public ChannelWriter Writer { get; protected set; } public static implicit operator ChannelReader(Channel channel) { return channel.Reader; } public static implicit operator ChannelWriter(Channel channel) { return channel.Writer; } } internal interface IDebugEnumerable { IEnumerator GetEnumerator(); } internal sealed class DebugEnumeratorDebugView { [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)] public T[] Items { get; } public DebugEnumeratorDebugView(IDebugEnumerable enumerable) { List list = new List(); foreach (T item in enumerable) { list.Add(item); } Items = list.ToArray(); base..ctor(); } } [DebuggerDisplay("{DebuggerDisplay,nq}")] internal sealed class RendezvousChannel : Channel { [DebuggerDisplay("{DebuggerDisplay,nq}")] private sealed class RendezvousChannelReader : ChannelReader { internal readonly RendezvousChannel _parent; private readonly BlockedReadAsyncOperation _readerSingleton; private readonly WaitingReadAsyncOperation _waiterSingleton; public override Task Completion => _parent._completion.Task; public override bool CanCount => true; public override bool CanPeek => true; public override int Count => 0; internal string DebuggerDisplay { get { long num; long num2; lock (_parent.SyncObj) { num = ChannelUtilities.CountOperations(_parent._blockedReadersHead); num2 = ChannelUtilities.CountOperations(_parent._waitingReadersHead); } return $"ReadAsync={num}, WaitToReadAsync={num2}"; } } internal RendezvousChannelReader(RendezvousChannel parent) { _parent = parent; _readerSingleton = new BlockedReadAsyncOperation(parent._runContinuationsAsynchronously, default(CancellationToken), pooled: true); _waiterSingleton = new WaitingReadAsyncOperation(parent._runContinuationsAsynchronously, default(CancellationToken), pooled: true); } public override bool TryRead([MaybeNullWhen(false)] out T item) { RendezvousChannel parent = _parent; BlockedWriteAsyncOperation blockedWriteAsyncOperation = null; lock (parent.SyncObj) { if (parent._doneWriting == null) { blockedWriteAsyncOperation = ChannelUtilities.TryDequeueAndReserveCompletionIfCancelable(ref parent._blockedWritersHead); } } if (blockedWriteAsyncOperation != null) { item = blockedWriteAsyncOperation.Item; blockedWriteAsyncOperation.DangerousSetResult(default(VoidResult)); return true; } item = default(T); return false; } public override bool TryPeek([MaybeNullWhen(false)] out T item) { RendezvousChannel parent = _parent; lock (parent.SyncObj) { if (parent._doneWriting == null) { BlockedWriteAsyncOperation blockedWritersHead = parent._blockedWritersHead; if (blockedWritersHead != null) { item = blockedWritersHead.Item; return true; } } } item = default(T); return false; } public override ValueTask ReadAsync(CancellationToken cancellationToken) { RendezvousChannel parent = _parent; if (cancellationToken.IsCancellationRequested) { return new ValueTask(Task.FromCanceled(cancellationToken)); } BlockedReadAsyncOperation blockedReadAsyncOperation = null; WaitingWriteAsyncOperation head = null; BlockedWriteAsyncOperation blockedWriteAsyncOperation = null; lock (parent.SyncObj) { if (parent._doneWriting != null) { return ChannelUtilities.GetInvalidCompletionValueTask(parent._doneWriting); } blockedWriteAsyncOperation = ChannelUtilities.TryDequeueAndReserveCompletionIfCancelable(ref parent._blockedWritersHead); if (blockedWriteAsyncOperation == null) { blockedReadAsyncOperation = ((!cancellationToken.CanBeCanceled && _readerSingleton.TryOwnAndReset()) ? _readerSingleton : new BlockedReadAsyncOperation(parent._runContinuationsAsynchronously, cancellationToken, pooled: false, _parent.CancellationCallbackDelegate)); ChannelUtilities.Enqueue(ref parent._blockedReadersHead, blockedReadAsyncOperation); head = ChannelUtilities.TryReserveCompletionIfCancelable(ref parent._waitingWritersHead); } } if (blockedWriteAsyncOperation != null) { ValueTask result = new ValueTask(blockedWriteAsyncOperation.Item); blockedWriteAsyncOperation.DangerousSetResult(default(VoidResult)); return result; } ChannelUtilities.DangerousSetOperations(head, result: true); return blockedReadAsyncOperation.ValueTaskOfT; } public override ValueTask WaitToReadAsync(CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { return new ValueTask(Task.FromCanceled(cancellationToken)); } RendezvousChannel parent = _parent; lock (parent.SyncObj) { if (parent._doneWriting != null) { return (parent._doneWriting != ChannelUtilities.s_doneWritingSentinel) ? new ValueTask(Task.FromException(parent._doneWriting)) : default(ValueTask); } if (parent._blockedWritersHead != null) { return new ValueTask(result: true); } WaitingReadAsyncOperation waitingReadAsyncOperation = ((!cancellationToken.CanBeCanceled && _waiterSingleton.TryOwnAndReset()) ? _waiterSingleton : new WaitingReadAsyncOperation(parent._runContinuationsAsynchronously, cancellationToken, pooled: false, _parent.CancellationCallbackDelegate)); ChannelUtilities.Enqueue(ref parent._waitingReadersHead, waitingReadAsyncOperation); return waitingReadAsyncOperation.ValueTaskOfT; } } } [DebuggerDisplay("{DebuggerDisplay,nq}")] private sealed class RendezvousChannelWriter : ChannelWriter { internal readonly RendezvousChannel _parent; private readonly BlockedWriteAsyncOperation _writerSingleton; private readonly WaitingWriteAsyncOperation _waiterSingleton; internal string DebuggerDisplay { get { long num; long num2; lock (_parent.SyncObj) { num = ChannelUtilities.CountOperations(_parent._blockedWritersHead); num2 = ChannelUtilities.CountOperations(_parent._waitingWritersHead); } return $"WriteAsync={num}, WaitToWriteAsync={num2}"; } } internal RendezvousChannelWriter(RendezvousChannel parent) { _parent = parent; _writerSingleton = new BlockedWriteAsyncOperation(runContinuationsAsynchronously: true, default(CancellationToken), pooled: true); _waiterSingleton = new WaitingWriteAsyncOperation(runContinuationsAsynchronously: true, default(CancellationToken), pooled: true); } public override bool TryComplete(Exception error) { RendezvousChannel parent = _parent; BlockedReadAsyncOperation blockedReadersHead; BlockedWriteAsyncOperation blockedWritersHead; WaitingReadAsyncOperation waitingReadersHead; WaitingWriteAsyncOperation waitingWritersHead; lock (parent.SyncObj) { if (parent._doneWriting != null) { return false; } parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel; blockedReadersHead = parent._blockedReadersHead; blockedWritersHead = parent._blockedWritersHead; waitingReadersHead = parent._waitingReadersHead; waitingWritersHead = parent._waitingWritersHead; parent._blockedReadersHead = null; parent._blockedWritersHead = null; parent._waitingReadersHead = null; parent._waitingWritersHead = null; } ChannelUtilities.Complete(parent._completion, error); ChannelUtilities.FailOperations(blockedReadersHead, ChannelUtilities.CreateInvalidCompletionException(error)); ChannelUtilities.FailOperations(blockedWritersHead, ChannelUtilities.CreateInvalidCompletionException(error)); ChannelUtilities.SetOrFailOperations(waitingReadersHead, result: false, error); ChannelUtilities.SetOrFailOperations(waitingWritersHead, result: false, error); return true; } public override bool TryWrite(T item) { RendezvousChannel parent = _parent; BlockedReadAsyncOperation blockedReadAsyncOperation = null; lock (parent.SyncObj) { if (parent._doneWriting == null) { blockedReadAsyncOperation = ChannelUtilities.TryDequeueAndReserveCompletionIfCancelable(ref parent._blockedReadersHead); } } if (blockedReadAsyncOperation != null) { blockedReadAsyncOperation.DangerousSetResult(item); return true; } if (parent._dropWrites) { parent._itemDropped?.Invoke(item); return true; } return false; } public override ValueTask WaitToWriteAsync(CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { return new ValueTask(Task.FromCanceled(cancellationToken)); } RendezvousChannel parent = _parent; lock (parent.SyncObj) { if (parent._doneWriting != null) { return (parent._doneWriting != ChannelUtilities.s_doneWritingSentinel) ? new ValueTask(Task.FromException(parent._doneWriting)) : default(ValueTask); } if (parent._blockedReadersHead != null || parent._dropWrites) { return new ValueTask(result: true); } WaitingWriteAsyncOperation waitingWriteAsyncOperation = ((!cancellationToken.CanBeCanceled && _waiterSingleton.TryOwnAndReset()) ? _waiterSingleton : new WaitingWriteAsyncOperation(parent._runContinuationsAsynchronously, cancellationToken, pooled: false, _parent.CancellationCallbackDelegate)); ChannelUtilities.Enqueue(ref parent._waitingWritersHead, waitingWriteAsyncOperation); return waitingWriteAsyncOperation.ValueTaskOfT; } } public override ValueTask WriteAsync(T item, CancellationToken cancellationToken) { RendezvousChannel parent = _parent; if (cancellationToken.IsCancellationRequested) { return new ValueTask(Task.FromCanceled(cancellationToken)); } BlockedWriteAsyncOperation blockedWriteAsyncOperation = null; WaitingReadAsyncOperation head = null; BlockedReadAsyncOperation blockedReadAsyncOperation = null; lock (parent.SyncObj) { if (parent._doneWriting != null) { return new ValueTask(Task.FromException(ChannelUtilities.CreateInvalidCompletionException(parent._doneWriting))); } blockedReadAsyncOperation = ChannelUtilities.TryDequeueAndReserveCompletionIfCancelable(ref parent._blockedReadersHead); if (blockedReadAsyncOperation == null && !parent._dropWrites) { blockedWriteAsyncOperation = ((!cancellationToken.CanBeCanceled && _writerSingleton.TryOwnAndReset()) ? _writerSingleton : new BlockedWriteAsyncOperation(parent._runContinuationsAsynchronously, cancellationToken, pooled: false, _parent.CancellationCallbackDelegate)); blockedWriteAsyncOperation.Item = item; ChannelUtilities.Enqueue(ref parent._blockedWritersHead, blockedWriteAsyncOperation); head = ChannelUtilities.TryReserveCompletionIfCancelable(ref parent._waitingReadersHead); } } if (blockedWriteAsyncOperation != null) { ChannelUtilities.DangerousSetOperations(head, result: true); return blockedWriteAsyncOperation.ValueTask; } if (blockedReadAsyncOperation != null) { blockedReadAsyncOperation.DangerousSetResult(item); } else { parent._itemDropped?.Invoke(item); } return default(ValueTask); } } private readonly bool _dropWrites; private readonly Action _itemDropped; private readonly TaskCompletionSource _completion; private BlockedReadAsyncOperation _blockedReadersHead; private BlockedWriteAsyncOperation _blockedWritersHead; private WaitingReadAsyncOperation _waitingReadersHead; private WaitingWriteAsyncOperation _waitingWritersHead; private readonly bool _runContinuationsAsynchronously; private Exception _doneWriting; private object SyncObj => _completion; private Action CancellationCallbackDelegate => delegate(object state, CancellationToken cancellationToken) { AsyncOperation asyncOperation = (AsyncOperation)state; if (asyncOperation.TrySetCanceled(cancellationToken)) { ChannelUtilities.UnsafeQueueUserWorkItem(delegate(KeyValuePair, AsyncOperation> state) { lock (state.Key.SyncObj) { AsyncOperation value = state.Value; if (!(value is BlockedReadAsyncOperation op)) { if (!(value is BlockedWriteAsyncOperation op2)) { if (!(value is WaitingReadAsyncOperation op3)) { if (value is WaitingWriteAsyncOperation op4) { ChannelUtilities.Remove(ref state.Key._waitingWritersHead, op4); } } else { ChannelUtilities.Remove(ref state.Key._waitingReadersHead, op3); } } else { ChannelUtilities.Remove(ref state.Key._blockedWritersHead, op2); } } else { ChannelUtilities.Remove(ref state.Key._blockedReadersHead, op); } } }, new KeyValuePair, AsyncOperation>(this, asyncOperation)); } }; private string DebuggerDisplay => ((RendezvousChannelReader)base.Reader).DebuggerDisplay + ", " + ((RendezvousChannelWriter)base.Writer).DebuggerDisplay; internal RendezvousChannel(BoundedChannelFullMode mode, bool runContinuationsAsynchronously, Action itemDropped) { _dropWrites = mode != BoundedChannelFullMode.Wait; _runContinuationsAsynchronously = runContinuationsAsynchronously; _itemDropped = itemDropped; _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None); base.Reader = new RendezvousChannelReader(this); base.Writer = new RendezvousChannelWriter(this); } [Conditional("DEBUG")] private void AssertInvariants() { _ = _blockedReadersHead; _ = _blockedWritersHead; _ = _completion.Task.IsCompleted; } } [DebuggerDisplay("Items = {ItemsCountForDebugger}, Closed = {ChannelIsClosedForDebugger}")] [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] internal sealed class SingleConsumerUnboundedChannel : Channel, IDebugEnumerable { [DebuggerDisplay("Items = {ItemsCountForDebugger}")] [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] private sealed class UnboundedChannelReader : ChannelReader, IDebugEnumerable { internal readonly SingleConsumerUnboundedChannel _parent; private readonly BlockedReadAsyncOperation _readerSingleton; private readonly WaitingReadAsyncOperation _waiterSingleton; public override Task Completion => _parent._completion.Task; public override bool CanPeek => true; private int ItemsCountForDebugger => _parent._items.Count; internal UnboundedChannelReader(SingleConsumerUnboundedChannel parent) { _parent = parent; _readerSingleton = new BlockedReadAsyncOperation(parent._runContinuationsAsynchronously, default(CancellationToken), pooled: true); _waiterSingleton = new WaitingReadAsyncOperation(parent._runContinuationsAsynchronously, default(CancellationToken), pooled: true); } public override ValueTask ReadAsync(CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { return new ValueTask(Task.FromCanceled(cancellationToken)); } if (TryRead(out var item)) { return new ValueTask(item); } SingleConsumerUnboundedChannel parent = _parent; BlockedReadAsyncOperation blockedReadAsyncOperation; BlockedReadAsyncOperation blockedReadAsyncOperation2; lock (parent.SyncObj) { if (TryRead(out item)) { return new ValueTask(item); } if (parent._doneWriting != null) { return ChannelUtilities.GetInvalidCompletionValueTask(parent._doneWriting); } blockedReadAsyncOperation = parent._blockedReader; if (!cancellationToken.CanBeCanceled && _readerSingleton.TryOwnAndReset()) { blockedReadAsyncOperation2 = _readerSingleton; if (blockedReadAsyncOperation2 == blockedReadAsyncOperation) { blockedReadAsyncOperation = null; } } else { blockedReadAsyncOperation2 = new BlockedReadAsyncOperation(_parent._runContinuationsAsynchronously, cancellationToken, pooled: false, _parent.CancellationCallbackDelegate); } parent._blockedReader = blockedReadAsyncOperation2; } blockedReadAsyncOperation?.TrySetCanceled(); return blockedReadAsyncOperation2.ValueTaskOfT; } public override bool TryRead([MaybeNullWhen(false)] out T item) { SingleConsumerUnboundedChannel parent = _parent; if (parent._items.TryDequeue(out item)) { if (parent._doneWriting != null && parent._items.IsEmpty) { ChannelUtilities.Complete(parent._completion, parent._doneWriting); } return true; } return false; } public override bool TryPeek([MaybeNullWhen(false)] out T item) { return _parent._items.TryPeek(out item); } public override ValueTask WaitToReadAsync(CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { return new ValueTask(Task.FromCanceled(cancellationToken)); } if (!_parent._items.IsEmpty) { return new ValueTask(result: true); } SingleConsumerUnboundedChannel parent = _parent; WaitingReadAsyncOperation waitingReadAsyncOperation = null; WaitingReadAsyncOperation waitingReadAsyncOperation2; lock (parent.SyncObj) { if (!parent._items.IsEmpty) { return new ValueTask(result: true); } if (parent._doneWriting != null) { return (parent._doneWriting != ChannelUtilities.s_doneWritingSentinel) ? new ValueTask(Task.FromException(parent._doneWriting)) : default(ValueTask); } waitingReadAsyncOperation = parent._waitingReader; if (!cancellationToken.CanBeCanceled && _waiterSingleton.TryOwnAndReset()) { waitingReadAsyncOperation2 = _waiterSingleton; if (waitingReadAsyncOperation2 == waitingReadAsyncOperation) { waitingReadAsyncOperation = null; } } else { waitingReadAsyncOperation2 = new WaitingReadAsyncOperation(_parent._runContinuationsAsynchronously, cancellationToken, pooled: false, _parent.CancellationCallbackDelegate); } parent._waitingReader = waitingReadAsyncOperation2; } waitingReadAsyncOperation?.TrySetCanceled(); return waitingReadAsyncOperation2.ValueTaskOfT; } IEnumerator IDebugEnumerable.GetEnumerator() { return _parent._items.GetEnumerator(); } } [DebuggerDisplay("Items = {ItemsCountForDebugger}")] [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] private sealed class UnboundedChannelWriter : ChannelWriter, IDebugEnumerable { internal readonly SingleConsumerUnboundedChannel _parent; private int ItemsCountForDebugger => _parent._items.Count; internal UnboundedChannelWriter(SingleConsumerUnboundedChannel parent) { _parent = parent; } public override bool TryComplete(Exception error) { BlockedReadAsyncOperation blockedReadAsyncOperation = null; WaitingReadAsyncOperation waitingReadAsyncOperation = null; bool flag = false; SingleConsumerUnboundedChannel parent = _parent; lock (parent.SyncObj) { if (parent._doneWriting != null) { return false; } parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel; if (parent._items.IsEmpty) { flag = true; if (parent._blockedReader != null) { blockedReadAsyncOperation = parent._blockedReader; parent._blockedReader = null; } if (parent._waitingReader != null) { waitingReadAsyncOperation = parent._waitingReader; parent._waitingReader = null; } } } if (flag) { ChannelUtilities.Complete(parent._completion, error); } if (blockedReadAsyncOperation != null) { error = ChannelUtilities.CreateInvalidCompletionException(error); blockedReadAsyncOperation.TrySetException(error); } if (waitingReadAsyncOperation != null) { if (error != null) { waitingReadAsyncOperation.TrySetException(error); } else { waitingReadAsyncOperation.TrySetResult(result: false); } } return true; } public override bool TryWrite(T item) { SingleConsumerUnboundedChannel parent = _parent; BlockedReadAsyncOperation blockedReadAsyncOperation; do { blockedReadAsyncOperation = null; WaitingReadAsyncOperation waitingReadAsyncOperation = null; lock (parent.SyncObj) { if (parent._doneWriting != null) { return false; } blockedReadAsyncOperation = parent._blockedReader; if (blockedReadAsyncOperation != null) { parent._blockedReader = null; } else { parent._items.Enqueue(item); waitingReadAsyncOperation = parent._waitingReader; if (waitingReadAsyncOperation == null) { return true; } parent._waitingReader = null; } } if (waitingReadAsyncOperation != null) { waitingReadAsyncOperation.TrySetResult(result: true); return true; } } while (!blockedReadAsyncOperation.TrySetResult(item)); return true; } public override ValueTask WaitToWriteAsync(CancellationToken cancellationToken) { Exception doneWriting = _parent._doneWriting; if (!cancellationToken.IsCancellationRequested) { if (doneWriting != null) { if (doneWriting == ChannelUtilities.s_doneWritingSentinel) { return default(ValueTask); } return new ValueTask(Task.FromException(doneWriting)); } return new ValueTask(result: true); } return new ValueTask(Task.FromCanceled(cancellationToken)); } public override ValueTask WriteAsync(T item, CancellationToken cancellationToken) { if (!cancellationToken.IsCancellationRequested) { if (!TryWrite(item)) { return new ValueTask(Task.FromException(ChannelUtilities.CreateInvalidCompletionException(_parent._doneWriting))); } return default(ValueTask); } return new ValueTask(Task.FromCanceled(cancellationToken)); } IEnumerator IDebugEnumerable.GetEnumerator() { return _parent._items.GetEnumerator(); } } private readonly TaskCompletionSource _completion; private readonly SingleProducerSingleConsumerQueue _items = new SingleProducerSingleConsumerQueue(); private readonly bool _runContinuationsAsynchronously; private volatile Exception _doneWriting; private BlockedReadAsyncOperation _blockedReader; private WaitingReadAsyncOperation _waitingReader; private object SyncObj => _items; private Action CancellationCallbackDelegate => delegate(object state, CancellationToken cancellationToken) { AsyncOperation asyncOperation = (AsyncOperation)state; if (asyncOperation.TrySetCanceled(cancellationToken)) { ChannelUtilities.UnsafeQueueUserWorkItem(delegate(KeyValuePair, AsyncOperation> state) { lock (state.Key.SyncObj) { AsyncOperation value = state.Value; if (!(value is BlockedReadAsyncOperation blockedReadAsyncOperation)) { if (value is WaitingReadAsyncOperation waitingReadAsyncOperation && state.Key._waitingReader == waitingReadAsyncOperation) { state.Key._waitingReader = null; } } else if (state.Key._blockedReader == blockedReadAsyncOperation) { state.Key._blockedReader = null; } } }, new KeyValuePair, AsyncOperation>(this, asyncOperation)); } }; private int ItemsCountForDebugger => _items.Count; private bool ChannelIsClosedForDebugger => _doneWriting != null; internal SingleConsumerUnboundedChannel(bool runContinuationsAsynchronously) { _runContinuationsAsynchronously = runContinuationsAsynchronously; _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None); base.Reader = new UnboundedChannelReader(this); base.Writer = new UnboundedChannelWriter(this); } IEnumerator IDebugEnumerable.GetEnumerator() { return _items.GetEnumerator(); } } [DebuggerDisplay("Items = {ItemsCountForDebugger}, Closed = {ChannelIsClosedForDebugger}")] [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] internal sealed class UnboundedChannel : Channel, IDebugEnumerable { [DebuggerDisplay("Items = {Count}")] [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] private sealed class UnboundedChannelReader : ChannelReader, IDebugEnumerable { internal readonly UnboundedChannel _parent; private readonly BlockedReadAsyncOperation _readerSingleton; private readonly WaitingReadAsyncOperation _waiterSingleton; public override Task Completion => _parent._completion.Task; public override bool CanCount => true; public override bool CanPeek => true; public override int Count => _parent._items.Count; internal UnboundedChannelReader(UnboundedChannel parent) { _parent = parent; _readerSingleton = new BlockedReadAsyncOperation(parent._runContinuationsAsynchronously, default(CancellationToken), pooled: true); _waiterSingleton = new WaitingReadAsyncOperation(parent._runContinuationsAsynchronously, default(CancellationToken), pooled: true); } public override ValueTask ReadAsync(CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { return new ValueTask(Task.FromCanceled(cancellationToken)); } UnboundedChannel parent = _parent; if (parent._items.TryDequeue(out var result)) { CompleteIfDone(parent); return new ValueTask(result); } lock (parent.SyncObj) { if (parent._items.TryDequeue(out result)) { CompleteIfDone(parent); return new ValueTask(result); } if (parent._doneWriting != null) { return ChannelUtilities.GetInvalidCompletionValueTask(parent._doneWriting); } BlockedReadAsyncOperation blockedReadAsyncOperation = ((!cancellationToken.CanBeCanceled && _readerSingleton.TryOwnAndReset()) ? _readerSingleton : new BlockedReadAsyncOperation(parent._runContinuationsAsynchronously, cancellationToken, pooled: false, _parent.CancellationCallbackDelegate)); ChannelUtilities.Enqueue(ref parent._blockedReadersHead, blockedReadAsyncOperation); return blockedReadAsyncOperation.ValueTaskOfT; } } public override bool TryRead([MaybeNullWhen(false)] out T item) { UnboundedChannel parent = _parent; if (parent._items.TryDequeue(out item)) { CompleteIfDone(parent); return true; } item = default(T); return false; } public override bool TryPeek([MaybeNullWhen(false)] out T item) { return _parent._items.TryPeek(out item); } private static void CompleteIfDone(UnboundedChannel parent) { if (parent._doneWriting != null && parent._items.IsEmpty) { ChannelUtilities.Complete(parent._completion, parent._doneWriting); } } public override ValueTask WaitToReadAsync(CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { return new ValueTask(Task.FromCanceled(cancellationToken)); } if (!_parent._items.IsEmpty) { return new ValueTask(result: true); } UnboundedChannel parent = _parent; lock (parent.SyncObj) { if (!parent._items.IsEmpty) { return new ValueTask(result: true); } if (parent._doneWriting != null) { return (parent._doneWriting != ChannelUtilities.s_doneWritingSentinel) ? new ValueTask(Task.FromException(parent._doneWriting)) : default(ValueTask); } WaitingReadAsyncOperation waitingReadAsyncOperation = ((!cancellationToken.CanBeCanceled && _waiterSingleton.TryOwnAndReset()) ? _waiterSingleton : new WaitingReadAsyncOperation(parent._runContinuationsAsynchronously, cancellationToken, pooled: false, _parent.CancellationCallbackDelegate)); ChannelUtilities.Enqueue(ref parent._waitingReadersHead, waitingReadAsyncOperation); return waitingReadAsyncOperation.ValueTaskOfT; } } IEnumerator IDebugEnumerable.GetEnumerator() { return _parent._items.GetEnumerator(); } } [DebuggerDisplay("Items = {ItemsCountForDebugger}")] [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] private sealed class UnboundedChannelWriter : ChannelWriter, IDebugEnumerable { internal readonly UnboundedChannel _parent; private int ItemsCountForDebugger => _parent._items.Count; internal UnboundedChannelWriter(UnboundedChannel parent) { _parent = parent; } public override bool TryComplete(Exception error) { UnboundedChannel parent = _parent; bool isEmpty; BlockedReadAsyncOperation blockedReadersHead; WaitingReadAsyncOperation waitingReadersHead; lock (parent.SyncObj) { if (parent._doneWriting != null) { return false; } parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel; isEmpty = parent._items.IsEmpty; blockedReadersHead = parent._blockedReadersHead; waitingReadersHead = parent._waitingReadersHead; parent._blockedReadersHead = null; parent._waitingReadersHead = null; } if (isEmpty) { ChannelUtilities.Complete(parent._completion, error); } ChannelUtilities.FailOperations(blockedReadersHead, ChannelUtilities.CreateInvalidCompletionException(error)); ChannelUtilities.SetOrFailOperations(waitingReadersHead, result: false, error); return true; } public override bool TryWrite(T item) { UnboundedChannel parent = _parent; BlockedReadAsyncOperation blockedReadAsyncOperation = null; WaitingReadAsyncOperation waitingReadAsyncOperation = null; lock (parent.SyncObj) { if (parent._doneWriting != null) { return false; } blockedReadAsyncOperation = ChannelUtilities.TryDequeueAndReserveCompletionIfCancelable(ref parent._blockedReadersHead); if (blockedReadAsyncOperation == null) { parent._items.Enqueue(item); waitingReadAsyncOperation = ChannelUtilities.TryReserveCompletionIfCancelable(ref parent._waitingReadersHead); } } if (blockedReadAsyncOperation != null) { blockedReadAsyncOperation.DangerousSetResult(item); } else if (waitingReadAsyncOperation != null) { ChannelUtilities.DangerousSetOperations(waitingReadAsyncOperation, result: true); } return true; } public override ValueTask WaitToWriteAsync(CancellationToken cancellationToken) { Exception doneWriting = _parent._doneWriting; if (!cancellationToken.IsCancellationRequested) { if (doneWriting != null) { if (doneWriting == ChannelUtilities.s_doneWritingSentinel) { return default(ValueTask); } return new ValueTask(Task.FromException(doneWriting)); } return new ValueTask(result: true); } return new ValueTask(Task.FromCanceled(cancellationToken)); } public override ValueTask WriteAsync(T item, CancellationToken cancellationToken) { if (!cancellationToken.IsCancellationRequested) { if (!TryWrite(item)) { return new ValueTask(Task.FromException(ChannelUtilities.CreateInvalidCompletionException(_parent._doneWriting))); } return default(ValueTask); } return new ValueTask(Task.FromCanceled(cancellationToken)); } IEnumerator IDebugEnumerable.GetEnumerator() { return _parent._items.GetEnumerator(); } } private readonly TaskCompletionSource _completion; private readonly ConcurrentQueue _items = new ConcurrentQueue(); private readonly bool _runContinuationsAsynchronously; private BlockedReadAsyncOperation _blockedReadersHead; private WaitingReadAsyncOperation _waitingReadersHead; private Exception _doneWriting; private object SyncObj => _items; private Action CancellationCallbackDelegate => delegate(object state, CancellationToken cancellationToken) { AsyncOperation asyncOperation = (AsyncOperation)state; if (asyncOperation.TrySetCanceled(cancellationToken)) { ChannelUtilities.UnsafeQueueUserWorkItem(delegate(KeyValuePair, AsyncOperation> state) { lock (state.Key.SyncObj) { AsyncOperation value = state.Value; if (!(value is BlockedReadAsyncOperation op)) { if (value is WaitingReadAsyncOperation op2) { ChannelUtilities.Remove(ref state.Key._waitingReadersHead, op2); } } else { ChannelUtilities.Remove(ref state.Key._blockedReadersHead, op); } } }, new KeyValuePair, AsyncOperation>(this, asyncOperation)); } }; private int ItemsCountForDebugger => _items.Count; private bool ChannelIsClosedForDebugger => _doneWriting != null; internal UnboundedChannel(bool runContinuationsAsynchronously) { _runContinuationsAsynchronously = runContinuationsAsynchronously; _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None); base.Reader = new UnboundedChannelReader(this); base.Writer = new UnboundedChannelWriter(this); } [Conditional("DEBUG")] private void AssertInvariants() { if (!_items.IsEmpty) { _ = _runContinuationsAsynchronously; } if (_blockedReadersHead != null || _waitingReadersHead != null) { _ = _runContinuationsAsynchronously; } _ = _completion.Task.IsCompleted; } IEnumerator IDebugEnumerable.GetEnumerator() { return _items.GetEnumerator(); } } internal sealed class TaskCompletionSource : TaskCompletionSource { public TaskCompletionSource(TaskCreationOptions creationOptions) : base(creationOptions) { } public bool TrySetResult() { return TrySetResult(default(VoidResult)); } } }