diff --git a/DisCatSharp.VoiceNext/VoiceNextConnection.cs b/DisCatSharp.VoiceNext/VoiceNextConnection.cs index b294a7119..0fe91533c 100644 --- a/DisCatSharp.VoiceNext/VoiceNextConnection.cs +++ b/DisCatSharp.VoiceNext/VoiceNextConnection.cs @@ -1,1365 +1,1367 @@ // This file is part of the DisCatSharp project, based off DSharpPlus. // // Copyright (c) 2021-2023 AITSYS // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. using System; using System.Buffers; using System.Buffers.Binary; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using DisCatSharp.Common.Utilities; using DisCatSharp.Entities; using DisCatSharp.Enums; using DisCatSharp.EventArgs; using DisCatSharp.Net; using DisCatSharp.Net.Udp; using DisCatSharp.Net.WebSocket; using DisCatSharp.VoiceNext.Codec; using DisCatSharp.VoiceNext.Entities; using DisCatSharp.VoiceNext.EventArgs; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Newtonsoft.Json.Linq; namespace DisCatSharp.VoiceNext; internal delegate Task VoiceDisconnectedEventHandler(DiscordGuild guild); /// /// VoiceNext connection to a voice channel. /// public sealed class VoiceNextConnection : IDisposable { /// /// Triggered whenever a user speaks in the connected voice channel. /// public event AsyncEventHandler UserSpeaking { add => this._userSpeaking.Register(value); remove => this._userSpeaking.Unregister(value); } private readonly AsyncEvent _userSpeaking; /// /// Triggered whenever a user joins voice in the connected guild. /// public event AsyncEventHandler UserJoined { add => this._userJoined.Register(value); remove => this._userJoined.Unregister(value); } private readonly AsyncEvent _userJoined; /// /// Triggered whenever a user leaves voice in the connected guild. /// public event AsyncEventHandler UserLeft { add => this._userLeft.Register(value); remove => this._userLeft.Unregister(value); } private readonly AsyncEvent _userLeft; /// /// Triggered whenever voice data is received from the connected voice channel. /// public event AsyncEventHandler VoiceReceived { add => this._voiceReceived.Register(value); remove => this._voiceReceived.Unregister(value); } private readonly AsyncEvent _voiceReceived; /// /// Triggered whenever voice WebSocket throws an exception. /// public event AsyncEventHandler VoiceSocketErrored { add => this._voiceSocketError.Register(value); remove => this._voiceSocketError.Unregister(value); } private readonly AsyncEvent _voiceSocketError; internal event VoiceDisconnectedEventHandler VoiceDisconnected; /// /// Gets the unix epoch. /// private static DateTimeOffset s_unixEpoch { get; } = new(1970, 1, 1, 0, 0, 0, TimeSpan.Zero); /// /// Gets the discord. /// private readonly DiscordClient _discord; /// /// Gets the guild. /// private readonly DiscordGuild _guild; /// /// Gets the transmitting s s r cs. /// private readonly ConcurrentDictionary _transmittingSsrCs; /// /// Gets the udp client. /// private readonly BaseUdpClient _udpClient; /// /// Gets or sets the voice ws. /// private IWebSocketClient _voiceWs; /// /// Gets or sets the heartbeat task. /// private Task _heartbeatTask; /// /// Gets or sets the heartbeat interval. /// private int _heartbeatInterval; /// /// Gets or sets the last heartbeat. /// private DateTimeOffset _lastHeartbeat; /// /// Gets or sets the token source. /// private CancellationTokenSource _tokenSource; /// /// Gets the token. /// private CancellationToken TOKEN => this._tokenSource.Token; /// /// Saves the last speaking flag /// private SpeakingFlags _speakingFlags; /// /// Gets or sets the server data. /// internal VoiceServerUpdatePayload ServerData { get; set; } /// /// Gets or sets the state data. /// internal VoiceStateUpdatePayload StateData { get; set; } /// /// Gets or sets a value indicating whether resume. /// internal bool Resume { get; set; } /// /// Gets the configuration. /// private readonly VoiceNextConfiguration _configuration; /// /// Gets or sets the opus. /// private Opus _opus; /// /// Gets or sets the sodium. /// private Sodium _sodium; /// /// Gets or sets the rtp. /// private Rtp _rtp; /// /// Gets or sets the selected encryption mode. /// private EncryptionMode _selectedEncryptionMode; /// /// Gets or sets the nonce. /// private uint _nonce; /// /// Gets or sets the sequence. /// private ushort _sequence; /// /// Gets or sets the timestamp. /// private uint _timestamp; /// /// Gets or sets the s s r c. /// private uint _ssrc; /// /// Gets or sets the key. /// private byte[] _key; /// /// Gets or sets the discovered endpoint. /// private IpEndpoint _discoveredEndpoint; /// /// Gets or sets the web socket endpoint. /// internal ConnectionEndpoint WebSocketEndpoint { get; set; } /// /// Gets or sets the udp endpoint. /// internal ConnectionEndpoint UdpEndpoint { get; set; } /// /// Gets or sets the ready wait. /// private readonly TaskCompletionSource _readyWait; /// /// Gets or sets a value indicating whether is initialized. /// private bool _isInitialized; /// /// Gets or sets a value indicating whether is disposed. /// private bool _isDisposed; /// /// Gets or sets the playing wait. /// private TaskCompletionSource _playingWait; /// /// Gets the pause event. /// private readonly AsyncManualResetEvent _pauseEvent; /// /// Gets or sets the transmit stream. /// private VoiceTransmitSink _transmitStream; /// /// Gets the transmit channel. /// private readonly Channel _transmitChannel; /// /// Gets the keepalive timestamps. /// private readonly ConcurrentDictionary _keepaliveTimestamps; private ulong _lastKeepalive; /// /// Gets or sets the sender task. /// private Task _senderTask; /// /// Gets or sets the sender token source. /// private CancellationTokenSource _senderTokenSource; /// /// Gets the sender token. /// private CancellationToken SENDER_TOKEN => this._senderTokenSource.Token; /// /// Gets or sets the receiver task. /// private Task _receiverTask; /// /// Gets or sets the receiver token source. /// private CancellationTokenSource _receiverTokenSource; /// /// Gets the receiver token. /// private CancellationToken RECEIVER_TOKEN => this._receiverTokenSource.Token; /// /// Gets or sets the keepalive task. /// private Task _keepaliveTask; /// /// Gets or sets the keepalive token source. /// private CancellationTokenSource _keepaliveTokenSource; /// /// Gets the keepalive token. /// private CancellationToken KEEPALIVE_TOKEN => this._keepaliveTokenSource.Token; /// /// Gets the audio format used by the Opus encoder. /// public AudioFormat AudioFormat => this._configuration.AudioFormat; /// /// Gets whether this connection is still playing audio. /// public bool IsPlaying => this._playingWait != null && !this._playingWait.Task.IsCompleted; /// /// Gets the websocket round-trip time in ms. /// public int WebSocketPing => Volatile.Read(ref this._wsPing); private int _wsPing; /// /// Gets the UDP round-trip time in ms. /// public int UdpPing => Volatile.Read(ref this._udpPing); private int _udpPing; private int _queueCount; /// /// Gets the channel this voice client is connected to. /// public DiscordChannel TargetChannel { get; internal set; } /// /// Initializes a new instance of the class. /// /// The client. /// The guild. /// The channel. /// The config. /// The server. /// The state. internal VoiceNextConnection(DiscordClient client, DiscordGuild guild, DiscordChannel channel, VoiceNextConfiguration config, VoiceServerUpdatePayload server, VoiceStateUpdatePayload state) { this._discord = client; this._guild = guild; this.TargetChannel = channel; this._transmittingSsrCs = new ConcurrentDictionary(); this._userSpeaking = new AsyncEvent("VNEXT_USER_SPEAKING", TimeSpan.Zero, this._discord.EventErrorHandler); this._userJoined = new AsyncEvent("VNEXT_USER_JOINED", TimeSpan.Zero, this._discord.EventErrorHandler); this._userLeft = new AsyncEvent("VNEXT_USER_LEFT", TimeSpan.Zero, this._discord.EventErrorHandler); this._voiceReceived = new AsyncEvent("VNEXT_VOICE_RECEIVED", TimeSpan.Zero, this._discord.EventErrorHandler); this._voiceSocketError = new AsyncEvent("VNEXT_WS_ERROR", TimeSpan.Zero, this._discord.EventErrorHandler); this._tokenSource = new CancellationTokenSource(); this._configuration = config; this._isInitialized = false; this._isDisposed = false; this._opus = new Opus(this.AudioFormat); //this.Sodium = new Sodium(); this._rtp = new Rtp(); this.ServerData = server; this.StateData = state; var eps = this.ServerData.Endpoint; var epi = eps.LastIndexOf(':'); var eph = string.Empty; var epp = 443; if (epi != -1) { eph = eps[..epi]; epp = int.Parse(eps[(epi + 1)..]); } else { eph = eps; } this.WebSocketEndpoint = new ConnectionEndpoint { Hostname = eph, Port = epp }; this._readyWait = new TaskCompletionSource(); this._playingWait = null; this._transmitChannel = Channel.CreateBounded(new BoundedChannelOptions(this._configuration.PacketQueueSize)); this._keepaliveTimestamps = new ConcurrentDictionary(); this._pauseEvent = new AsyncManualResetEvent(true); this._udpClient = this._discord.Configuration.UdpClientFactory(); this._voiceWs = this._discord.Configuration.WebSocketClientFactory(this._discord.Configuration.Proxy, this._discord.ServiceProvider); this._voiceWs.Disconnected += this.VoiceWS_SocketClosed; this._voiceWs.MessageReceived += this.VoiceWS_SocketMessage; this._voiceWs.Connected += this.VoiceWS_SocketOpened; this._voiceWs.ExceptionThrown += this.VoiceWs_SocketException; } ~VoiceNextConnection() { this.Dispose(); } /// /// Connects to the specified voice channel. /// /// A task representing the connection operation. internal Task ConnectAsync() { var gwuri = new UriBuilder { Scheme = "wss", Host = this.WebSocketEndpoint.Hostname, Query = "encoding=json&v=4" }; return this._voiceWs.ConnectAsync(gwuri.Uri); } /// /// Reconnects . /// /// A Task. internal Task ReconnectAsync() => this._voiceWs.DisconnectAsync(); /// /// Starts . /// /// A Task. internal async Task StartAsync() { // Let's announce our intentions to the server var vdp = new VoiceDispatch(); if (!this.Resume) { vdp.OpCode = 0; vdp.Payload = new VoiceIdentifyPayload { ServerId = this.ServerData.GuildId, UserId = this.StateData.UserId.Value, SessionId = this.StateData.SessionId, Token = this.ServerData.Token }; this.Resume = true; } else { vdp.OpCode = 7; vdp.Payload = new VoiceIdentifyPayload { ServerId = this.ServerData.GuildId, SessionId = this.StateData.SessionId, Token = this.ServerData.Token }; } var vdj = JsonConvert.SerializeObject(vdp, Formatting.None); await this.WsSendAsync(vdj).ConfigureAwait(false); } /// /// Waits the for ready async. /// /// A Task. internal Task WaitForReadyAsync() => this._readyWait.Task; /// /// Enqueues the packet async. /// /// The packet. /// The token. /// A Task. internal async Task EnqueuePacketAsync(RawVoicePacket packet, CancellationToken token = default) { await this._transmitChannel.Writer.WriteAsync(packet, token).ConfigureAwait(false); this._queueCount++; } /// /// Prepares the packet. /// /// The pcm. /// The target. /// The length. /// A bool. internal bool PreparePacket(ReadOnlySpan pcm, out byte[] target, out int length) { target = null; length = 0; if (this._isDisposed) return false; var audioFormat = this.AudioFormat; var packetArray = ArrayPool.Shared.Rent(this._rtp.CalculatePacketSize(audioFormat.SampleCountToSampleSize(audioFormat.CalculateMaximumFrameSize()), this._selectedEncryptionMode)); var packet = packetArray.AsSpan(); this._rtp.EncodeHeader(this._sequence, this._timestamp, this._ssrc, packet); var opus = packet.Slice(Rtp.HEADER_SIZE, pcm.Length); this._opus.Encode(pcm, ref opus); this._sequence++; this._timestamp += (uint)audioFormat.CalculateFrameSize(audioFormat.CalculateSampleDuration(pcm.Length)); Span nonce = stackalloc byte[Sodium.NonceSize]; switch (this._selectedEncryptionMode) { case EncryptionMode.XSalsa20Poly1305: this._sodium.GenerateNonce(packet[..Rtp.HEADER_SIZE], nonce); break; case EncryptionMode.XSalsa20Poly1305Suffix: this._sodium.GenerateNonce(nonce); break; case EncryptionMode.XSalsa20Poly1305Lite: this._sodium.GenerateNonce(this._nonce++, nonce); break; default: ArrayPool.Shared.Return(packetArray); throw new Exception("Unsupported encryption mode."); } Span encrypted = stackalloc byte[Sodium.CalculateTargetSize(opus)]; this._sodium.Encrypt(opus, encrypted, nonce); encrypted.CopyTo(packet[Rtp.HEADER_SIZE..]); packet = packet[..this._rtp.CalculatePacketSize(encrypted.Length, this._selectedEncryptionMode)]; this._sodium.AppendNonce(nonce, packet, this._selectedEncryptionMode); target = packetArray; length = packet.Length; return true; } /// /// Voices the sender task. /// /// A Task. private async Task VoiceSenderTask() { var token = this.SENDER_TOKEN; var client = this._udpClient; var reader = this._transmitChannel.Reader; byte[] data = null; var length = 0; var synchronizerTicks = (double)Stopwatch.GetTimestamp(); var synchronizerResolution = Stopwatch.Frequency * 0.005; var tickResolution = 10_000_000.0 / Stopwatch.Frequency; this._discord.Logger.LogDebug(VoiceNextEvents.Misc, "Timer accuracy: {0}/{1} (high resolution? {2})", Stopwatch.Frequency, synchronizerResolution, Stopwatch.IsHighResolution); while (!token.IsCancellationRequested) { await this._pauseEvent.WaitAsync().ConfigureAwait(false); var hasPacket = reader.TryRead(out var rawPacket); if (hasPacket) { this._queueCount--; if (this._playingWait == null || this._playingWait.Task.IsCompleted) this._playingWait = new TaskCompletionSource(); } // Provided by Laura#0090 (214796473689178133); this is Python, but adaptable: // // delay = max(0, self.delay + ((start_time + self.delay * loops) + - time.time())) // // self.delay // sample size // start_time // time since streaming started // loops // number of samples sent // time.time() // DateTime.Now if (hasPacket) { hasPacket = this.PreparePacket(rawPacket.Bytes.Span, out data, out length); if (rawPacket.RentedBuffer != null) ArrayPool.Shared.Return(rawPacket.RentedBuffer); } var durationModifier = hasPacket ? rawPacket.Duration / 5 : 4; var cts = Math.Max(Stopwatch.GetTimestamp() - synchronizerTicks, 0); if (cts < synchronizerResolution * durationModifier) await Task.Delay(TimeSpan.FromTicks((long)(((synchronizerResolution * durationModifier) - cts) * tickResolution))).ConfigureAwait(false); synchronizerTicks += synchronizerResolution * durationModifier; if (!hasPacket) continue; await this.SendSpeakingAsync(this._speakingFlags != SpeakingFlags.NotSpeaking ? this._speakingFlags : SpeakingFlags.Microphone).ConfigureAwait(false); await client.SendAsync(data, length).ConfigureAwait(false); ArrayPool.Shared.Return(data); if (!rawPacket.Silence && this._queueCount == 0) { var nullpcm = new byte[this.AudioFormat.CalculateSampleSize(20)]; for (var i = 0; i < 3; i++) { var nullpacket = new byte[nullpcm.Length]; var nullpacketmem = nullpacket.AsMemory(); await this.EnqueuePacketAsync(new RawVoicePacket(nullpacketmem, 20, true)).ConfigureAwait(false); } } else if (this._queueCount == 0) { this._speakingFlags = SpeakingFlags.NotSpeaking; await this.SendSpeakingAsync(this._speakingFlags).ConfigureAwait(false); this._playingWait?.SetResult(true); } } } /// /// Processes the packet. /// /// The data. /// The opus. /// The pcm. /// The pcm packets. /// The voice sender. /// The output format. /// A bool. private bool ProcessPacket(ReadOnlySpan data, ref Memory opus, ref Memory pcm, IList> pcmPackets, out AudioSender voiceSender, out AudioFormat outputFormat) { voiceSender = null; outputFormat = default; if (!this._rtp.IsRtpHeader(data)) return false; this._rtp.DecodeHeader(data, out var shortSequence, out var timestamp, out var ssrc, out var hasExtension); if (!this._transmittingSsrCs.TryGetValue(ssrc, out var vtx)) { var decoder = this._opus.CreateDecoder(); vtx = new AudioSender(ssrc, decoder) { // user isn't present as we haven't received a speaking event yet. User = null }; } voiceSender = vtx; var sequence = vtx.GetTrueSequenceAfterWrapping(shortSequence); ushort gap = 0; if (vtx.LastTrueSequence is ulong lastTrueSequence) { if (sequence <= lastTrueSequence) // out-of-order packet; discard return false; gap = (ushort)(sequence - 1 - lastTrueSequence); if (gap >= 5) this._discord.Logger.LogWarning(VoiceNextEvents.VoiceReceiveFailure, "5 or more voice packets were dropped when receiving"); } Span nonce = stackalloc byte[Sodium.NonceSize]; this._sodium.GetNonce(data, nonce, this._selectedEncryptionMode); this._rtp.GetDataFromPacket(data, out var encryptedOpus, this._selectedEncryptionMode); var opusSize = Sodium.CalculateSourceSize(encryptedOpus); opus = opus[..opusSize]; var opusSpan = opus.Span; try { this._sodium.Decrypt(encryptedOpus, opusSpan, nonce); // Strip extensions, if any if (hasExtension) { // RFC 5285, 4.2 One-Byte header // http://www.rfcreader.com/#rfc5285_line186 if (opusSpan[0] == 0xBE && opusSpan[1] == 0xDE) { var headerLen = (opusSpan[2] << 8) | opusSpan[3]; var i = 4; for (; i < headerLen + 4; i++) { var @byte = opusSpan[i]; // ID is currently unused since we skip it anyway //var id = (byte)(@byte >> 4); var length = (byte)(@byte & 0x0F) + 1; i += length; } // Strip extension padding too while (opusSpan[i] == 0) i++; opusSpan = opusSpan[i..]; } // TODO: consider implementing RFC 5285, 4.3. Two-Byte Header } if (opusSpan[0] == 0x90) { // I'm not 100% sure what this header is/does, however removing the data causes no // real issues, and has the added benefit of removing a lot of noise. opusSpan = opusSpan[2..]; } if (gap == 1) { var lastSampleCount = this._opus.GetLastPacketSampleCount(vtx.Decoder); var fecpcm = new byte[this.AudioFormat.SampleCountToSampleSize(lastSampleCount)]; var fecpcmMem = fecpcm.AsSpan(); this._opus.Decode(vtx.Decoder, opusSpan, ref fecpcmMem, true, out _); pcmPackets.Add(fecpcm.AsMemory(0, fecpcmMem.Length)); } else if (gap > 1) { var lastSampleCount = this._opus.GetLastPacketSampleCount(vtx.Decoder); for (var i = 0; i < gap; i++) { var fecpcm = new byte[this.AudioFormat.SampleCountToSampleSize(lastSampleCount)]; var fecpcmMem = fecpcm.AsSpan(); this._opus.ProcessPacketLoss(vtx.Decoder, lastSampleCount, ref fecpcmMem); pcmPackets.Add(fecpcm.AsMemory(0, fecpcmMem.Length)); } } var pcmSpan = pcm.Span; this._opus.Decode(vtx.Decoder, opusSpan, ref pcmSpan, false, out outputFormat); pcm = pcm[..pcmSpan.Length]; } finally { vtx.LastTrueSequence = sequence; } return true; } /// /// Processes the voice packet. /// /// The data. /// A Task. private async Task ProcessVoicePacket(byte[] data) { if (data.Length < 13) // minimum packet length return; try { var pcm = new byte[this.AudioFormat.CalculateMaximumFrameSize()]; var pcmMem = pcm.AsMemory(); var opus = new byte[pcm.Length]; var opusMem = opus.AsMemory(); var pcmFillers = new List>(); if (!this.ProcessPacket(data, ref opusMem, ref pcmMem, pcmFillers, out var vtx, out var audioFormat)) return; foreach (var pcmFiller in pcmFillers) await this._voiceReceived.InvokeAsync(this, new VoiceReceiveEventArgs(this._discord.ServiceProvider) { Ssrc = vtx.Ssrc, User = vtx.User, PcmData = pcmFiller, OpusData = Array.Empty().AsMemory(), AudioFormat = audioFormat, AudioDuration = audioFormat.CalculateSampleDuration(pcmFiller.Length) }).ConfigureAwait(false); await this._voiceReceived.InvokeAsync(this, new VoiceReceiveEventArgs(this._discord.ServiceProvider) { Ssrc = vtx.Ssrc, User = vtx.User, PcmData = pcmMem, OpusData = opusMem, AudioFormat = audioFormat, AudioDuration = audioFormat.CalculateSampleDuration(pcmMem.Length) }).ConfigureAwait(false); } catch (Exception ex) { this._discord.Logger.LogError(VoiceNextEvents.VoiceReceiveFailure, ex, "Exception occurred when decoding incoming audio data"); } } /// /// Processes the keepalive. /// /// The data. private void ProcessKeepalive(byte[] data) { try { var keepalive = BinaryPrimitives.ReadUInt64LittleEndian(data); if (!this._keepaliveTimestamps.TryRemove(keepalive, out var timestamp)) return; var tdelta = (int)((Stopwatch.GetTimestamp() - timestamp) / (double)Stopwatch.Frequency * 1000); this._discord.Logger.LogDebug(VoiceNextEvents.VoiceKeepalive, "Received UDP keepalive {0} (ping {1}ms)", keepalive, tdelta); Volatile.Write(ref this._udpPing, tdelta); } catch (Exception ex) { this._discord.Logger.LogError(VoiceNextEvents.VoiceKeepalive, ex, "Exception occurred when handling keepalive"); } } /// /// Udps the receiver task. /// /// A Task. private async Task UdpReceiverTask() { var token = this.RECEIVER_TOKEN; var client = this._udpClient; while (!token.IsCancellationRequested) { var data = await client.ReceiveAsync().ConfigureAwait(false); if (data.Length == 8) this.ProcessKeepalive(data); else if (this._configuration.EnableIncoming) await this.ProcessVoicePacket(data).ConfigureAwait(false); } } /// /// Sends a speaking status to the connected voice channel. /// /// Set the speaking flags. /// A task representing the sending operation. public async Task SendSpeakingAsync(SpeakingFlags flags = SpeakingFlags.Microphone) { if (!this._isInitialized) throw new InvalidOperationException("The connection is not initialized"); - this._speakingFlags = flags; - var pld = new VoiceDispatch + if (this._speakingFlags != flags) { - OpCode = 5, - Payload = new VoiceSpeakingPayload + this._speakingFlags = flags; + var pld = new VoiceDispatch { - Speaking = flags, - Delay = 0 - } - }; + OpCode = 5, + Payload = new VoiceSpeakingPayload + { + Speaking = flags, + Delay = 0 + } + }; - var plj = JsonConvert.SerializeObject(pld, Formatting.None); - this._discord.Logger.LogDebug("Voice payload: {payload}", pld); - await this.WsSendAsync(plj).ConfigureAwait(false); + var plj = JsonConvert.SerializeObject(pld, Formatting.None); + await this.WsSendAsync(plj).ConfigureAwait(false); + } } /// /// Gets a transmit stream for this connection, optionally specifying a packet size to use with the stream. If a stream is already configured, it will return the existing one. /// /// Duration, in ms, to use for audio packets. /// Transmit stream. public VoiceTransmitSink GetTransmitSink(int sampleDuration = 20) { if (!AudioFormat.AllowedSampleDurations.Contains(sampleDuration)) throw new ArgumentOutOfRangeException(nameof(sampleDuration), "Invalid PCM sample duration specified."); this._transmitStream ??= new VoiceTransmitSink(this, sampleDuration); return this._transmitStream; } /// /// Asynchronously waits for playback to be finished. Playback is finished when speaking = false is signaled. /// /// A task representing the waiting operation. public async Task WaitForPlaybackFinishAsync() { if (this._playingWait != null) await this._playingWait.Task.ConfigureAwait(false); } /// /// Pauses playback. /// public void Pause() => this._pauseEvent.Reset(); /// /// Asynchronously resumes playback. /// /// public async Task ResumeAsync() => await this._pauseEvent.SetAsync().ConfigureAwait(false); /// /// Disconnects and disposes this voice connection. /// public void Disconnect() => this.Dispose(); /// /// Disconnects and disposes this voice connection. /// public void Dispose() { if (this._isDisposed) return; try { this._isDisposed = true; this._isInitialized = false; this._tokenSource?.Cancel(); this._senderTokenSource?.Cancel(); this._receiverTokenSource?.Cancel(); } catch (Exception ex) { this._discord.Logger.LogError(ex, ex.Message); } try { this._voiceWs.DisconnectAsync().ConfigureAwait(false).GetAwaiter().GetResult(); this._udpClient.Close(); } catch { } try { this._keepaliveTokenSource?.Cancel(); this._tokenSource?.Dispose(); this._senderTokenSource?.Dispose(); this._receiverTokenSource?.Dispose(); this._keepaliveTokenSource?.Dispose(); this._opus?.Dispose(); this._opus = null; this._sodium?.Dispose(); this._sodium = null; this._rtp?.Dispose(); this._rtp = null; } catch (Exception ex) { this._discord.Logger.LogError(ex, ex.Message); } this.VoiceDisconnected?.Invoke(this._guild); GC.SuppressFinalize(this); } /// /// Heartbeats . /// /// A Task. private async Task HeartbeatAsync() { await Task.Yield(); var token = this.TOKEN; while (true) { try { token.ThrowIfCancellationRequested(); var dt = DateTime.Now; this._discord.Logger.LogTrace(VoiceNextEvents.VoiceHeartbeat, "Sent heartbeat"); var hbd = new VoiceDispatch { OpCode = 3, Payload = UnixTimestamp(dt) }; var hbj = JsonConvert.SerializeObject(hbd); await this.WsSendAsync(hbj).ConfigureAwait(false); this._lastHeartbeat = dt; await Task.Delay(this._heartbeatInterval).ConfigureAwait(false); } catch (OperationCanceledException) { return; } } } /// /// Keepalives . /// /// A Task. private async Task KeepaliveAsync() { await Task.Yield(); var token = this.KEEPALIVE_TOKEN; var client = this._udpClient; while (!token.IsCancellationRequested) { var timestamp = Stopwatch.GetTimestamp(); var keepalive = Volatile.Read(ref this._lastKeepalive); Volatile.Write(ref this._lastKeepalive, keepalive + 1); this._keepaliveTimestamps.TryAdd(keepalive, timestamp); var packet = new byte[8]; BinaryPrimitives.WriteUInt64LittleEndian(packet, keepalive); await client.SendAsync(packet, packet.Length).ConfigureAwait(false); await Task.Delay(5000, token).ConfigureAwait(false); } } /// /// Stage1S . /// /// The voice ready. /// A Task. private async Task Stage1(VoiceReadyPayload voiceReady) { // IP Discovery this._udpClient.Setup(this.UdpEndpoint); var pck = new byte[74]; PreparePacket(pck); await this._udpClient.SendAsync(pck, pck.Length).ConfigureAwait(false); var ipd = await this._udpClient.ReceiveAsync().ConfigureAwait(false); ReadPacket(ipd, out var ip, out var port); this._discoveredEndpoint = new IpEndpoint { Address = ip, Port = port }; this._discord.Logger.LogTrace(VoiceNextEvents.VoiceHandshake, "Endpoint discovery finished - discovered endpoint is {0}:{1}", ip, port); void PreparePacket(byte[] packet) { var ssrc = this._ssrc; ushort type = 0x1; ushort length = 70; var packetSpan = packet.AsSpan(); BinaryPrimitives.WriteUInt16BigEndian(packetSpan[..2], type); BinaryPrimitives.WriteUInt16BigEndian(packetSpan.Slice(2, 2), length); BinaryPrimitives.WriteUInt32BigEndian(packetSpan.Slice(4, 4), ssrc); packetSpan[8..].Fill(0); } void ReadPacket(byte[] packet, out System.Net.IPAddress decodedIp, out ushort decodedPort) { var packetSpan = packet.AsSpan(); var ipString = Utilities.UTF8.GetString(packet, 8, 64 /* 74 - 10 */).TrimEnd('\0'); decodedIp = System.Net.IPAddress.Parse(ipString); decodedPort = BinaryPrimitives.ReadUInt16BigEndian(packetSpan[72..] /* 74 - 2 */); } // Select voice encryption mode var selectedEncryptionMode = Sodium.SelectMode(voiceReady.Modes); this._selectedEncryptionMode = selectedEncryptionMode.Value; // Ready this._discord.Logger.LogTrace(VoiceNextEvents.VoiceHandshake, "Selected encryption mode is {0}", selectedEncryptionMode.Key); var vsp = new VoiceDispatch { OpCode = 1, Payload = new VoiceSelectProtocolPayload { Protocol = "udp", Data = new VoiceSelectProtocolPayloadData { Address = this._discoveredEndpoint.Address.ToString(), Port = (ushort)this._discoveredEndpoint.Port, Mode = selectedEncryptionMode.Key } } }; var vsj = JsonConvert.SerializeObject(vsp, Formatting.None); await this.WsSendAsync(vsj).ConfigureAwait(false); this._senderTokenSource = new CancellationTokenSource(); this._senderTask = Task.Run(this.VoiceSenderTask, this.SENDER_TOKEN); this._receiverTokenSource = new CancellationTokenSource(); this._receiverTask = Task.Run(this.UdpReceiverTask, this.RECEIVER_TOKEN); } /// /// Stage2S . /// /// The voice session description. /// A Task. private async Task Stage2(VoiceSessionDescriptionPayload voiceSessionDescription) { this._selectedEncryptionMode = Sodium.SupportedModes[voiceSessionDescription.Mode.ToLowerInvariant()]; this._discord.Logger.LogTrace(VoiceNextEvents.VoiceHandshake, "Discord updated encryption mode - new mode is {0}", this._selectedEncryptionMode); // start keepalive this._keepaliveTokenSource = new CancellationTokenSource(); this._keepaliveTask = this.KeepaliveAsync(); // send 3 packets of silence to get things going var nullpcm = new byte[this.AudioFormat.CalculateSampleSize(20)]; for (var i = 0; i < 3; i++) { var nullPcm = new byte[nullpcm.Length]; var nullpacketmem = nullPcm.AsMemory(); await this.EnqueuePacketAsync(new RawVoicePacket(nullpacketmem, 20, true)).ConfigureAwait(false); } this._isInitialized = true; this._readyWait.SetResult(true); } /// /// Handles the dispatch. /// /// The jo. /// A Task. private async Task HandleDispatch(JObject jo) { var opc = (int)jo["op"]; var opp = jo["d"] as JObject; switch (opc) { case 2: // READY this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received READY (OP2)"); var vrp = opp.ToObject(); this._ssrc = vrp.Ssrc; this.UdpEndpoint = new ConnectionEndpoint(vrp.Address, vrp.Port); // this is not the valid interval // oh, discord //this.HeartbeatInterval = vrp.HeartbeatInterval; this._heartbeatTask = Task.Run(this.HeartbeatAsync); await this.Stage1(vrp).ConfigureAwait(false); break; case 4: // SESSION_DESCRIPTION this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received SESSION_DESCRIPTION (OP4)"); var vsd = opp.ToObject(); this._key = vsd.SecretKey; this._sodium = new Sodium(this._key.AsMemory()); await this.Stage2(vsd).ConfigureAwait(false); break; case 5: // SPEAKING // Don't spam OP5 // No longer spam, Discord supposedly doesn't send many of these this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received SPEAKING (OP5)"); var spd = opp.ToObject(); var foundUserInCache = this._discord.TryGetCachedUserInternal(spd.UserId.Value, out var resolvedUser); var spk = new UserSpeakingEventArgs(this._discord.ServiceProvider) { Speaking = spd.Speaking, Ssrc = spd.Ssrc.Value, User = resolvedUser, }; if (foundUserInCache && this._transmittingSsrCs.TryGetValue(spk.Ssrc, out var txssrc5) && txssrc5.Id == 0) { txssrc5.User = spk.User; } else { var opus = this._opus.CreateDecoder(); var vtx = new AudioSender(spk.Ssrc, opus) { User = await this._discord.GetUserAsync(spd.UserId.Value).ConfigureAwait(false) }; if (!this._transmittingSsrCs.TryAdd(spk.Ssrc, vtx)) this._opus.DestroyDecoder(opus); } await this._userSpeaking.InvokeAsync(this, spk).ConfigureAwait(false); break; case 6: // HEARTBEAT ACK var dt = DateTime.Now; var ping = (int)(dt - this._lastHeartbeat).TotalMilliseconds; Volatile.Write(ref this._wsPing, ping); this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received HEARTBEAT_ACK (OP6, {0}ms)", ping); this._lastHeartbeat = dt; break; case 8: // HELLO // this sends a heartbeat interval that we need to use for heartbeating this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received HELLO (OP8)"); this._heartbeatInterval = opp["heartbeat_interval"].ToObject(); break; case 9: // RESUMED this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received RESUMED (OP9)"); this._heartbeatTask = Task.Run(this.HeartbeatAsync); break; case 12: // CLIENT_CONNECTED this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received CLIENT_CONNECTED (OP12)"); var ujpd = opp.ToObject(); var usrj = await this._discord.GetUserAsync(ujpd.UserId).ConfigureAwait(false); { var opus = this._opus.CreateDecoder(); var vtx = new AudioSender(ujpd.Ssrc, opus) { User = usrj }; if (!this._transmittingSsrCs.TryAdd(vtx.Ssrc, vtx)) this._opus.DestroyDecoder(opus); } await this._userJoined.InvokeAsync(this, new VoiceUserJoinEventArgs(this._discord.ServiceProvider) { User = usrj, Ssrc = ujpd.Ssrc }).ConfigureAwait(false); break; case 13: // CLIENT_DISCONNECTED this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received CLIENT_DISCONNECTED (OP13)"); var ulpd = opp.ToObject(); var txssrc = this._transmittingSsrCs.FirstOrDefault(x => x.Value.Id == ulpd.UserId); if (this._transmittingSsrCs.ContainsKey(txssrc.Key)) { this._transmittingSsrCs.TryRemove(txssrc.Key, out var txssrc13); this._opus.DestroyDecoder(txssrc13.Decoder); } var usrl = await this._discord.GetUserAsync(ulpd.UserId).ConfigureAwait(false); await this._userLeft.InvokeAsync(this, new VoiceUserLeaveEventArgs(this._discord.ServiceProvider) { User = usrl, Ssrc = txssrc.Key }).ConfigureAwait(false); break; default: this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received unknown voice opcode (OP{0})", opc); break; } } /// /// Voices the w s_ socket closed. /// /// The client. /// The e. /// A Task. private async Task VoiceWS_SocketClosed(IWebSocketClient client, SocketCloseEventArgs e) { this._discord.Logger.LogDebug(VoiceNextEvents.VoiceConnectionClose, "Voice WebSocket closed ({0}, '{1}')", e.CloseCode, e.CloseMessage); // generally this should not be disposed on all disconnects, only on requested ones // or something // otherwise problems happen //this.Dispose(); if (e.CloseCode == 4006 || e.CloseCode == 4009) this.Resume = false; if (!this._isDisposed) { this._tokenSource.Cancel(); this._tokenSource = new CancellationTokenSource(); this._voiceWs = this._discord.Configuration.WebSocketClientFactory(this._discord.Configuration.Proxy, this._discord.ServiceProvider); this._voiceWs.Disconnected += this.VoiceWS_SocketClosed; this._voiceWs.MessageReceived += this.VoiceWS_SocketMessage; this._voiceWs.Connected += this.VoiceWS_SocketOpened; if (this.Resume) // emzi you dipshit await this.ConnectAsync().ConfigureAwait(false); } } /// /// Voices the w s_ socket message. /// /// The client. /// The e. /// A Task. private Task VoiceWS_SocketMessage(IWebSocketClient client, SocketMessageEventArgs e) { if (e is not SocketTextMessageEventArgs et) { this._discord.Logger.LogCritical(VoiceNextEvents.VoiceGatewayError, "Discord Voice Gateway sent binary data - unable to process"); return Task.CompletedTask; } this._discord.Logger.LogTrace(VoiceNextEvents.VoiceWsRx, et.Message); return this.HandleDispatch(JObject.Parse(et.Message)); } /// /// Voices the w s_ socket opened. /// /// The client. /// The e. /// A Task. private Task VoiceWS_SocketOpened(IWebSocketClient client, SocketEventArgs e) => this.StartAsync(); /// /// Voices the ws_ socket exception. /// /// The client. /// The e. /// A Task. private Task VoiceWs_SocketException(IWebSocketClient client, SocketErrorEventArgs e) => this._voiceSocketError.InvokeAsync(this, new SocketErrorEventArgs(this._discord.ServiceProvider) { Exception = e.Exception }); /// /// Ws the send async. /// /// The payload. /// A Task. private async Task WsSendAsync(string payload) { this._discord.Logger.LogTrace(VoiceNextEvents.VoiceWsTx, payload); await this._voiceWs.SendMessageAsync(payload).ConfigureAwait(false); } /// /// Gets the unix timestamp. /// /// The datetime. private static uint UnixTimestamp(DateTime dt) { var ts = dt - s_unixEpoch; var sd = ts.TotalSeconds; var si = (uint)sd; return si; } }