diff --git a/DisCatSharp.VoiceNext/VoiceNextConnection.cs b/DisCatSharp.VoiceNext/VoiceNextConnection.cs
index 0fe91533c..bfdb8994f 100644
--- a/DisCatSharp.VoiceNext/VoiceNextConnection.cs
+++ b/DisCatSharp.VoiceNext/VoiceNextConnection.cs
@@ -1,1367 +1,1367 @@
// This file is part of the DisCatSharp project, based off DSharpPlus.
//
// Copyright (c) 2021-2023 AITSYS
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using DisCatSharp.Common.Utilities;
using DisCatSharp.Entities;
using DisCatSharp.Enums;
using DisCatSharp.EventArgs;
using DisCatSharp.Net;
using DisCatSharp.Net.Udp;
using DisCatSharp.Net.WebSocket;
using DisCatSharp.VoiceNext.Codec;
using DisCatSharp.VoiceNext.Entities;
using DisCatSharp.VoiceNext.EventArgs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace DisCatSharp.VoiceNext;
internal delegate Task VoiceDisconnectedEventHandler(DiscordGuild guild);
///
/// VoiceNext connection to a voice channel.
///
public sealed class VoiceNextConnection : IDisposable
{
///
/// Triggered whenever a user speaks in the connected voice channel.
///
public event AsyncEventHandler UserSpeaking
{
add => this._userSpeaking.Register(value);
remove => this._userSpeaking.Unregister(value);
}
private readonly AsyncEvent _userSpeaking;
///
/// Triggered whenever a user joins voice in the connected guild.
///
public event AsyncEventHandler UserJoined
{
add => this._userJoined.Register(value);
remove => this._userJoined.Unregister(value);
}
private readonly AsyncEvent _userJoined;
///
/// Triggered whenever a user leaves voice in the connected guild.
///
public event AsyncEventHandler UserLeft
{
add => this._userLeft.Register(value);
remove => this._userLeft.Unregister(value);
}
private readonly AsyncEvent _userLeft;
///
/// Triggered whenever voice data is received from the connected voice channel.
///
public event AsyncEventHandler VoiceReceived
{
add => this._voiceReceived.Register(value);
remove => this._voiceReceived.Unregister(value);
}
private readonly AsyncEvent _voiceReceived;
///
/// Triggered whenever voice WebSocket throws an exception.
///
public event AsyncEventHandler VoiceSocketErrored
{
add => this._voiceSocketError.Register(value);
remove => this._voiceSocketError.Unregister(value);
}
private readonly AsyncEvent _voiceSocketError;
internal event VoiceDisconnectedEventHandler VoiceDisconnected;
///
/// Gets the unix epoch.
///
private static DateTimeOffset s_unixEpoch { get; } = new(1970, 1, 1, 0, 0, 0, TimeSpan.Zero);
///
/// Gets the discord.
///
private readonly DiscordClient _discord;
///
/// Gets the guild.
///
private readonly DiscordGuild _guild;
///
/// Gets the transmitting s s r cs.
///
private readonly ConcurrentDictionary _transmittingSsrCs;
///
/// Gets the udp client.
///
private readonly BaseUdpClient _udpClient;
///
/// Gets or sets the voice ws.
///
private IWebSocketClient _voiceWs;
///
/// Gets or sets the heartbeat task.
///
private Task _heartbeatTask;
///
/// Gets or sets the heartbeat interval.
///
private int _heartbeatInterval;
///
/// Gets or sets the last heartbeat.
///
private DateTimeOffset _lastHeartbeat;
///
/// Gets or sets the token source.
///
private CancellationTokenSource _tokenSource;
///
/// Gets the token.
///
private CancellationToken TOKEN
=> this._tokenSource.Token;
///
/// Saves the last speaking flag
///
private SpeakingFlags _speakingFlags;
///
/// Gets or sets the server data.
///
internal VoiceServerUpdatePayload ServerData { get; set; }
///
/// Gets or sets the state data.
///
internal VoiceStateUpdatePayload StateData { get; set; }
///
/// Gets or sets a value indicating whether resume.
///
internal bool Resume { get; set; }
///
/// Gets the configuration.
///
private readonly VoiceNextConfiguration _configuration;
///
/// Gets or sets the opus.
///
private Opus _opus;
///
/// Gets or sets the sodium.
///
private Sodium _sodium;
///
/// Gets or sets the rtp.
///
private Rtp _rtp;
///
/// Gets or sets the selected encryption mode.
///
private EncryptionMode _selectedEncryptionMode;
///
/// Gets or sets the nonce.
///
private uint _nonce;
///
/// Gets or sets the sequence.
///
private ushort _sequence;
///
/// Gets or sets the timestamp.
///
private uint _timestamp;
///
/// Gets or sets the s s r c.
///
private uint _ssrc;
///
/// Gets or sets the key.
///
private byte[] _key;
///
/// Gets or sets the discovered endpoint.
///
private IpEndpoint _discoveredEndpoint;
///
/// Gets or sets the web socket endpoint.
///
internal ConnectionEndpoint WebSocketEndpoint { get; set; }
///
/// Gets or sets the udp endpoint.
///
internal ConnectionEndpoint UdpEndpoint { get; set; }
///
/// Gets or sets the ready wait.
///
private readonly TaskCompletionSource _readyWait;
///
/// Gets or sets a value indicating whether is initialized.
///
private bool _isInitialized;
///
/// Gets or sets a value indicating whether is disposed.
///
private bool _isDisposed;
///
/// Gets or sets the playing wait.
///
private TaskCompletionSource _playingWait;
///
/// Gets the pause event.
///
private readonly AsyncManualResetEvent _pauseEvent;
///
/// Gets or sets the transmit stream.
///
private VoiceTransmitSink _transmitStream;
///
/// Gets the transmit channel.
///
private readonly Channel _transmitChannel;
///
/// Gets the keepalive timestamps.
///
private readonly ConcurrentDictionary _keepaliveTimestamps;
private ulong _lastKeepalive;
///
/// Gets or sets the sender task.
///
private Task _senderTask;
///
/// Gets or sets the sender token source.
///
private CancellationTokenSource _senderTokenSource;
///
/// Gets the sender token.
///
private CancellationToken SENDER_TOKEN
=> this._senderTokenSource.Token;
///
/// Gets or sets the receiver task.
///
private Task _receiverTask;
///
/// Gets or sets the receiver token source.
///
private CancellationTokenSource _receiverTokenSource;
///
/// Gets the receiver token.
///
private CancellationToken RECEIVER_TOKEN
=> this._receiverTokenSource.Token;
///
/// Gets or sets the keepalive task.
///
private Task _keepaliveTask;
///
/// Gets or sets the keepalive token source.
///
private CancellationTokenSource _keepaliveTokenSource;
///
/// Gets the keepalive token.
///
private CancellationToken KEEPALIVE_TOKEN
=> this._keepaliveTokenSource.Token;
///
/// Gets the audio format used by the Opus encoder.
///
public AudioFormat AudioFormat => this._configuration.AudioFormat;
///
/// Gets whether this connection is still playing audio.
///
public bool IsPlaying
=> this._playingWait != null && !this._playingWait.Task.IsCompleted;
///
/// Gets the websocket round-trip time in ms.
///
public int WebSocketPing
=> Volatile.Read(ref this._wsPing);
private int _wsPing;
///
/// Gets the UDP round-trip time in ms.
///
public int UdpPing
=> Volatile.Read(ref this._udpPing);
private int _udpPing;
private int _queueCount;
///
/// Gets the channel this voice client is connected to.
///
public DiscordChannel TargetChannel { get; internal set; }
///
/// Initializes a new instance of the class.
///
/// The client.
/// The guild.
/// The channel.
/// The config.
/// The server.
/// The state.
internal VoiceNextConnection(DiscordClient client, DiscordGuild guild, DiscordChannel channel, VoiceNextConfiguration config, VoiceServerUpdatePayload server, VoiceStateUpdatePayload state)
{
this._discord = client;
this._guild = guild;
this.TargetChannel = channel;
this._transmittingSsrCs = new ConcurrentDictionary();
this._userSpeaking = new AsyncEvent("VNEXT_USER_SPEAKING", TimeSpan.Zero, this._discord.EventErrorHandler);
this._userJoined = new AsyncEvent("VNEXT_USER_JOINED", TimeSpan.Zero, this._discord.EventErrorHandler);
this._userLeft = new AsyncEvent("VNEXT_USER_LEFT", TimeSpan.Zero, this._discord.EventErrorHandler);
this._voiceReceived = new AsyncEvent("VNEXT_VOICE_RECEIVED", TimeSpan.Zero, this._discord.EventErrorHandler);
this._voiceSocketError = new AsyncEvent("VNEXT_WS_ERROR", TimeSpan.Zero, this._discord.EventErrorHandler);
this._tokenSource = new CancellationTokenSource();
this._configuration = config;
this._isInitialized = false;
this._isDisposed = false;
this._opus = new Opus(this.AudioFormat);
//this.Sodium = new Sodium();
this._rtp = new Rtp();
this.ServerData = server;
this.StateData = state;
var eps = this.ServerData.Endpoint;
var epi = eps.LastIndexOf(':');
var eph = string.Empty;
var epp = 443;
if (epi != -1)
{
eph = eps[..epi];
epp = int.Parse(eps[(epi + 1)..]);
}
else
{
eph = eps;
}
this.WebSocketEndpoint = new ConnectionEndpoint { Hostname = eph, Port = epp };
this._readyWait = new TaskCompletionSource();
this._playingWait = null;
this._transmitChannel = Channel.CreateBounded(new BoundedChannelOptions(this._configuration.PacketQueueSize));
this._keepaliveTimestamps = new ConcurrentDictionary();
this._pauseEvent = new AsyncManualResetEvent(true);
this._udpClient = this._discord.Configuration.UdpClientFactory();
this._voiceWs = this._discord.Configuration.WebSocketClientFactory(this._discord.Configuration.Proxy, this._discord.ServiceProvider);
this._voiceWs.Disconnected += this.VoiceWS_SocketClosed;
this._voiceWs.MessageReceived += this.VoiceWS_SocketMessage;
this._voiceWs.Connected += this.VoiceWS_SocketOpened;
this._voiceWs.ExceptionThrown += this.VoiceWs_SocketException;
}
~VoiceNextConnection()
{
this.Dispose();
}
///
/// Connects to the specified voice channel.
///
/// A task representing the connection operation.
internal Task ConnectAsync()
{
var gwuri = new UriBuilder
{
Scheme = "wss",
Host = this.WebSocketEndpoint.Hostname,
Query = "encoding=json&v=4"
};
return this._voiceWs.ConnectAsync(gwuri.Uri);
}
///
/// Reconnects .
///
/// A Task.
internal Task ReconnectAsync()
=> this._voiceWs.DisconnectAsync();
///
/// Starts .
///
/// A Task.
internal async Task StartAsync()
{
// Let's announce our intentions to the server
var vdp = new VoiceDispatch();
if (!this.Resume)
{
vdp.OpCode = 0;
vdp.Payload = new VoiceIdentifyPayload
{
ServerId = this.ServerData.GuildId,
UserId = this.StateData.UserId.Value,
SessionId = this.StateData.SessionId,
Token = this.ServerData.Token
};
this.Resume = true;
}
else
{
vdp.OpCode = 7;
vdp.Payload = new VoiceIdentifyPayload
{
ServerId = this.ServerData.GuildId,
SessionId = this.StateData.SessionId,
Token = this.ServerData.Token
};
}
var vdj = JsonConvert.SerializeObject(vdp, Formatting.None);
await this.WsSendAsync(vdj).ConfigureAwait(false);
}
///
/// Waits the for ready async.
///
/// A Task.
internal Task WaitForReadyAsync()
=> this._readyWait.Task;
///
/// Enqueues the packet async.
///
/// The packet.
/// The token.
/// A Task.
internal async Task EnqueuePacketAsync(RawVoicePacket packet, CancellationToken token = default)
{
await this._transmitChannel.Writer.WriteAsync(packet, token).ConfigureAwait(false);
this._queueCount++;
}
///
/// Prepares the packet.
///
/// The pcm.
/// The target.
/// The length.
/// A bool.
internal bool PreparePacket(ReadOnlySpan pcm, out byte[] target, out int length)
{
target = null;
length = 0;
if (this._isDisposed)
return false;
var audioFormat = this.AudioFormat;
var packetArray = ArrayPool.Shared.Rent(this._rtp.CalculatePacketSize(audioFormat.SampleCountToSampleSize(audioFormat.CalculateMaximumFrameSize()), this._selectedEncryptionMode));
var packet = packetArray.AsSpan();
this._rtp.EncodeHeader(this._sequence, this._timestamp, this._ssrc, packet);
var opus = packet.Slice(Rtp.HEADER_SIZE, pcm.Length);
this._opus.Encode(pcm, ref opus);
this._sequence++;
this._timestamp += (uint)audioFormat.CalculateFrameSize(audioFormat.CalculateSampleDuration(pcm.Length));
Span nonce = stackalloc byte[Sodium.NonceSize];
switch (this._selectedEncryptionMode)
{
case EncryptionMode.XSalsa20Poly1305:
this._sodium.GenerateNonce(packet[..Rtp.HEADER_SIZE], nonce);
break;
case EncryptionMode.XSalsa20Poly1305Suffix:
this._sodium.GenerateNonce(nonce);
break;
case EncryptionMode.XSalsa20Poly1305Lite:
this._sodium.GenerateNonce(this._nonce++, nonce);
break;
default:
ArrayPool.Shared.Return(packetArray);
throw new Exception("Unsupported encryption mode.");
}
Span encrypted = stackalloc byte[Sodium.CalculateTargetSize(opus)];
this._sodium.Encrypt(opus, encrypted, nonce);
encrypted.CopyTo(packet[Rtp.HEADER_SIZE..]);
packet = packet[..this._rtp.CalculatePacketSize(encrypted.Length, this._selectedEncryptionMode)];
this._sodium.AppendNonce(nonce, packet, this._selectedEncryptionMode);
target = packetArray;
length = packet.Length;
return true;
}
///
/// Voices the sender task.
///
/// A Task.
private async Task VoiceSenderTask()
{
var token = this.SENDER_TOKEN;
var client = this._udpClient;
var reader = this._transmitChannel.Reader;
byte[] data = null;
var length = 0;
var synchronizerTicks = (double)Stopwatch.GetTimestamp();
var synchronizerResolution = Stopwatch.Frequency * 0.005;
var tickResolution = 10_000_000.0 / Stopwatch.Frequency;
this._discord.Logger.LogDebug(VoiceNextEvents.Misc, "Timer accuracy: {0}/{1} (high resolution? {2})", Stopwatch.Frequency, synchronizerResolution, Stopwatch.IsHighResolution);
while (!token.IsCancellationRequested)
{
await this._pauseEvent.WaitAsync().ConfigureAwait(false);
var hasPacket = reader.TryRead(out var rawPacket);
if (hasPacket)
{
this._queueCount--;
if (this._playingWait == null || this._playingWait.Task.IsCompleted)
this._playingWait = new TaskCompletionSource();
}
// Provided by Laura#0090 (214796473689178133); this is Python, but adaptable:
//
// delay = max(0, self.delay + ((start_time + self.delay * loops) + - time.time()))
//
// self.delay
// sample size
// start_time
// time since streaming started
// loops
// number of samples sent
// time.time()
// DateTime.Now
if (hasPacket)
{
hasPacket = this.PreparePacket(rawPacket.Bytes.Span, out data, out length);
if (rawPacket.RentedBuffer != null)
ArrayPool.Shared.Return(rawPacket.RentedBuffer);
}
var durationModifier = hasPacket ? rawPacket.Duration / 5 : 4;
var cts = Math.Max(Stopwatch.GetTimestamp() - synchronizerTicks, 0);
if (cts < synchronizerResolution * durationModifier)
await Task.Delay(TimeSpan.FromTicks((long)(((synchronizerResolution * durationModifier) - cts) * tickResolution))).ConfigureAwait(false);
synchronizerTicks += synchronizerResolution * durationModifier;
if (!hasPacket)
continue;
await this.SendSpeakingAsync(this._speakingFlags != SpeakingFlags.NotSpeaking ? this._speakingFlags : SpeakingFlags.Microphone).ConfigureAwait(false);
await client.SendAsync(data, length).ConfigureAwait(false);
ArrayPool.Shared.Return(data);
if (!rawPacket.Silence && this._queueCount == 0)
{
var nullpcm = new byte[this.AudioFormat.CalculateSampleSize(20)];
for (var i = 0; i < 3; i++)
{
var nullpacket = new byte[nullpcm.Length];
var nullpacketmem = nullpacket.AsMemory();
await this.EnqueuePacketAsync(new RawVoicePacket(nullpacketmem, 20, true)).ConfigureAwait(false);
}
}
else if (this._queueCount == 0)
{
this._speakingFlags = SpeakingFlags.NotSpeaking;
await this.SendSpeakingAsync(this._speakingFlags).ConfigureAwait(false);
this._playingWait?.SetResult(true);
}
}
}
///
/// Processes the packet.
///
/// The data.
/// The opus.
/// The pcm.
/// The pcm packets.
/// The voice sender.
/// The output format.
/// A bool.
private bool ProcessPacket(ReadOnlySpan data, ref Memory opus, ref Memory pcm, IList> pcmPackets, out AudioSender voiceSender, out AudioFormat outputFormat)
{
voiceSender = null;
outputFormat = default;
if (!this._rtp.IsRtpHeader(data))
return false;
this._rtp.DecodeHeader(data, out var shortSequence, out var timestamp, out var ssrc, out var hasExtension);
if (!this._transmittingSsrCs.TryGetValue(ssrc, out var vtx))
{
var decoder = this._opus.CreateDecoder();
vtx = new AudioSender(ssrc, decoder)
{
// user isn't present as we haven't received a speaking event yet.
User = null
};
}
voiceSender = vtx;
var sequence = vtx.GetTrueSequenceAfterWrapping(shortSequence);
ushort gap = 0;
if (vtx.LastTrueSequence is ulong lastTrueSequence)
{
if (sequence <= lastTrueSequence) // out-of-order packet; discard
return false;
gap = (ushort)(sequence - 1 - lastTrueSequence);
if (gap >= 5)
this._discord.Logger.LogWarning(VoiceNextEvents.VoiceReceiveFailure, "5 or more voice packets were dropped when receiving");
}
Span nonce = stackalloc byte[Sodium.NonceSize];
this._sodium.GetNonce(data, nonce, this._selectedEncryptionMode);
this._rtp.GetDataFromPacket(data, out var encryptedOpus, this._selectedEncryptionMode);
var opusSize = Sodium.CalculateSourceSize(encryptedOpus);
opus = opus[..opusSize];
var opusSpan = opus.Span;
try
{
this._sodium.Decrypt(encryptedOpus, opusSpan, nonce);
// Strip extensions, if any
if (hasExtension)
{
// RFC 5285, 4.2 One-Byte header
// http://www.rfcreader.com/#rfc5285_line186
if (opusSpan[0] == 0xBE && opusSpan[1] == 0xDE)
{
var headerLen = (opusSpan[2] << 8) | opusSpan[3];
var i = 4;
for (; i < headerLen + 4; i++)
{
var @byte = opusSpan[i];
// ID is currently unused since we skip it anyway
//var id = (byte)(@byte >> 4);
var length = (byte)(@byte & 0x0F) + 1;
i += length;
}
// Strip extension padding too
while (opusSpan[i] == 0)
i++;
opusSpan = opusSpan[i..];
}
// TODO: consider implementing RFC 5285, 4.3. Two-Byte Header
}
if (opusSpan[0] == 0x90)
{
// I'm not 100% sure what this header is/does, however removing the data causes no
// real issues, and has the added benefit of removing a lot of noise.
opusSpan = opusSpan[2..];
}
if (gap == 1)
{
var lastSampleCount = this._opus.GetLastPacketSampleCount(vtx.Decoder);
var fecpcm = new byte[this.AudioFormat.SampleCountToSampleSize(lastSampleCount)];
var fecpcmMem = fecpcm.AsSpan();
this._opus.Decode(vtx.Decoder, opusSpan, ref fecpcmMem, true, out _);
pcmPackets.Add(fecpcm.AsMemory(0, fecpcmMem.Length));
}
else if (gap > 1)
{
var lastSampleCount = this._opus.GetLastPacketSampleCount(vtx.Decoder);
for (var i = 0; i < gap; i++)
{
var fecpcm = new byte[this.AudioFormat.SampleCountToSampleSize(lastSampleCount)];
var fecpcmMem = fecpcm.AsSpan();
this._opus.ProcessPacketLoss(vtx.Decoder, lastSampleCount, ref fecpcmMem);
pcmPackets.Add(fecpcm.AsMemory(0, fecpcmMem.Length));
}
}
var pcmSpan = pcm.Span;
this._opus.Decode(vtx.Decoder, opusSpan, ref pcmSpan, false, out outputFormat);
pcm = pcm[..pcmSpan.Length];
}
finally
{
vtx.LastTrueSequence = sequence;
}
return true;
}
///
/// Processes the voice packet.
///
/// The data.
/// A Task.
private async Task ProcessVoicePacket(byte[] data)
{
if (data.Length < 13) // minimum packet length
return;
try
{
var pcm = new byte[this.AudioFormat.CalculateMaximumFrameSize()];
var pcmMem = pcm.AsMemory();
var opus = new byte[pcm.Length];
var opusMem = opus.AsMemory();
var pcmFillers = new List>();
if (!this.ProcessPacket(data, ref opusMem, ref pcmMem, pcmFillers, out var vtx, out var audioFormat))
return;
foreach (var pcmFiller in pcmFillers)
await this._voiceReceived.InvokeAsync(this, new VoiceReceiveEventArgs(this._discord.ServiceProvider)
{
Ssrc = vtx.Ssrc,
User = vtx.User,
PcmData = pcmFiller,
OpusData = Array.Empty().AsMemory(),
AudioFormat = audioFormat,
AudioDuration = audioFormat.CalculateSampleDuration(pcmFiller.Length)
}).ConfigureAwait(false);
await this._voiceReceived.InvokeAsync(this, new VoiceReceiveEventArgs(this._discord.ServiceProvider)
{
Ssrc = vtx.Ssrc,
User = vtx.User,
PcmData = pcmMem,
OpusData = opusMem,
AudioFormat = audioFormat,
AudioDuration = audioFormat.CalculateSampleDuration(pcmMem.Length)
}).ConfigureAwait(false);
}
catch (Exception ex)
{
this._discord.Logger.LogError(VoiceNextEvents.VoiceReceiveFailure, ex, "Exception occurred when decoding incoming audio data");
}
}
///
/// Processes the keepalive.
///
/// The data.
private void ProcessKeepalive(byte[] data)
{
try
{
var keepalive = BinaryPrimitives.ReadUInt64LittleEndian(data);
if (!this._keepaliveTimestamps.TryRemove(keepalive, out var timestamp))
return;
var tdelta = (int)((Stopwatch.GetTimestamp() - timestamp) / (double)Stopwatch.Frequency * 1000);
this._discord.Logger.LogDebug(VoiceNextEvents.VoiceKeepalive, "Received UDP keepalive {0} (ping {1}ms)", keepalive, tdelta);
Volatile.Write(ref this._udpPing, tdelta);
}
catch (Exception ex)
{
this._discord.Logger.LogError(VoiceNextEvents.VoiceKeepalive, ex, "Exception occurred when handling keepalive");
}
}
///
/// Udps the receiver task.
///
/// A Task.
private async Task UdpReceiverTask()
{
var token = this.RECEIVER_TOKEN;
var client = this._udpClient;
while (!token.IsCancellationRequested)
{
var data = await client.ReceiveAsync().ConfigureAwait(false);
if (data.Length == 8)
this.ProcessKeepalive(data);
else if (this._configuration.EnableIncoming)
await this.ProcessVoicePacket(data).ConfigureAwait(false);
}
}
///
/// Sends a speaking status to the connected voice channel.
///
/// Set the speaking flags.
/// A task representing the sending operation.
public async Task SendSpeakingAsync(SpeakingFlags flags = SpeakingFlags.Microphone)
{
if (!this._isInitialized)
throw new InvalidOperationException("The connection is not initialized");
if (this._speakingFlags != flags)
{
this._speakingFlags = flags;
var pld = new VoiceDispatch
{
OpCode = 5,
Payload = new VoiceSpeakingPayload
{
Speaking = flags,
Delay = 0
}
};
var plj = JsonConvert.SerializeObject(pld, Formatting.None);
await this.WsSendAsync(plj).ConfigureAwait(false);
}
}
///
/// Gets a transmit stream for this connection, optionally specifying a packet size to use with the stream. If a stream is already configured, it will return the existing one.
///
/// Duration, in ms, to use for audio packets.
/// Transmit stream.
public VoiceTransmitSink GetTransmitSink(int sampleDuration = 20)
{
if (!AudioFormat.AllowedSampleDurations.Contains(sampleDuration))
throw new ArgumentOutOfRangeException(nameof(sampleDuration), "Invalid PCM sample duration specified.");
this._transmitStream ??= new VoiceTransmitSink(this, sampleDuration);
return this._transmitStream;
}
///
/// Asynchronously waits for playback to be finished. Playback is finished when speaking = false is signaled.
///
/// A task representing the waiting operation.
public async Task WaitForPlaybackFinishAsync()
{
if (this._playingWait != null)
await this._playingWait.Task.ConfigureAwait(false);
}
///
/// Pauses playback.
///
public void Pause()
=> this._pauseEvent.Reset();
///
/// Asynchronously resumes playback.
///
///
public async Task ResumeAsync()
=> await this._pauseEvent.SetAsync().ConfigureAwait(false);
///
/// Disconnects and disposes this voice connection.
///
public void Disconnect()
=> this.Dispose();
///
/// Disconnects and disposes this voice connection.
///
public void Dispose()
{
if (this._isDisposed)
return;
try
{
this._isDisposed = true;
this._isInitialized = false;
this._tokenSource?.Cancel();
this._senderTokenSource?.Cancel();
this._receiverTokenSource?.Cancel();
}
catch (Exception ex)
{
this._discord.Logger.LogError(ex, ex.Message);
}
try
{
this._voiceWs.DisconnectAsync().ConfigureAwait(false).GetAwaiter().GetResult();
this._udpClient.Close();
}
catch { }
try
{
this._keepaliveTokenSource?.Cancel();
this._tokenSource?.Dispose();
this._senderTokenSource?.Dispose();
this._receiverTokenSource?.Dispose();
this._keepaliveTokenSource?.Dispose();
this._opus?.Dispose();
this._opus = null;
this._sodium?.Dispose();
this._sodium = null;
this._rtp?.Dispose();
this._rtp = null;
}
catch (Exception ex)
{
this._discord.Logger.LogError(ex, ex.Message);
}
this.VoiceDisconnected?.Invoke(this._guild);
GC.SuppressFinalize(this);
}
///
/// Heartbeats .
///
/// A Task.
private async Task HeartbeatAsync()
{
await Task.Yield();
var token = this.TOKEN;
while (true)
{
try
{
token.ThrowIfCancellationRequested();
var dt = DateTime.Now;
this._discord.Logger.LogTrace(VoiceNextEvents.VoiceHeartbeat, "Sent heartbeat");
var hbd = new VoiceDispatch
{
OpCode = 3,
Payload = UnixTimestamp(dt)
};
var hbj = JsonConvert.SerializeObject(hbd);
await this.WsSendAsync(hbj).ConfigureAwait(false);
this._lastHeartbeat = dt;
await Task.Delay(this._heartbeatInterval).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return;
}
}
}
///
/// Keepalives .
///
/// A Task.
private async Task KeepaliveAsync()
{
await Task.Yield();
var token = this.KEEPALIVE_TOKEN;
var client = this._udpClient;
while (!token.IsCancellationRequested)
{
var timestamp = Stopwatch.GetTimestamp();
var keepalive = Volatile.Read(ref this._lastKeepalive);
Volatile.Write(ref this._lastKeepalive, keepalive + 1);
this._keepaliveTimestamps.TryAdd(keepalive, timestamp);
var packet = new byte[8];
BinaryPrimitives.WriteUInt64LittleEndian(packet, keepalive);
await client.SendAsync(packet, packet.Length).ConfigureAwait(false);
await Task.Delay(5000, token).ConfigureAwait(false);
}
}
///
/// Stage1S .
///
/// The voice ready.
/// A Task.
private async Task Stage1(VoiceReadyPayload voiceReady)
{
// IP Discovery
this._udpClient.Setup(this.UdpEndpoint);
var pck = new byte[74];
PreparePacket(pck);
await this._udpClient.SendAsync(pck, pck.Length).ConfigureAwait(false);
var ipd = await this._udpClient.ReceiveAsync().ConfigureAwait(false);
ReadPacket(ipd, out var ip, out var port);
this._discoveredEndpoint = new IpEndpoint
{
Address = ip,
Port = port
};
this._discord.Logger.LogTrace(VoiceNextEvents.VoiceHandshake, "Endpoint discovery finished - discovered endpoint is {0}:{1}", ip, port);
void PreparePacket(byte[] packet)
{
var ssrc = this._ssrc;
ushort type = 0x1;
ushort length = 70;
var packetSpan = packet.AsSpan();
BinaryPrimitives.WriteUInt16BigEndian(packetSpan[..2], type);
BinaryPrimitives.WriteUInt16BigEndian(packetSpan.Slice(2, 2), length);
BinaryPrimitives.WriteUInt32BigEndian(packetSpan.Slice(4, 4), ssrc);
packetSpan[8..].Fill(0);
}
void ReadPacket(byte[] packet, out System.Net.IPAddress decodedIp, out ushort decodedPort)
{
var packetSpan = packet.AsSpan();
var ipString = Utilities.UTF8.GetString(packet, 8, 64 /* 74 - 10 */).TrimEnd('\0');
decodedIp = System.Net.IPAddress.Parse(ipString);
decodedPort = BinaryPrimitives.ReadUInt16BigEndian(packetSpan[72..] /* 74 - 2 */);
}
// Select voice encryption mode
var selectedEncryptionMode = Sodium.SelectMode(voiceReady.Modes);
this._selectedEncryptionMode = selectedEncryptionMode.Value;
// Ready
this._discord.Logger.LogTrace(VoiceNextEvents.VoiceHandshake, "Selected encryption mode is {0}", selectedEncryptionMode.Key);
var vsp = new VoiceDispatch
{
OpCode = 1,
Payload = new VoiceSelectProtocolPayload
{
Protocol = "udp",
Data = new VoiceSelectProtocolPayloadData
{
Address = this._discoveredEndpoint.Address.ToString(),
Port = (ushort)this._discoveredEndpoint.Port,
Mode = selectedEncryptionMode.Key
}
}
};
var vsj = JsonConvert.SerializeObject(vsp, Formatting.None);
await this.WsSendAsync(vsj).ConfigureAwait(false);
this._senderTokenSource = new CancellationTokenSource();
this._senderTask = Task.Run(this.VoiceSenderTask, this.SENDER_TOKEN);
this._receiverTokenSource = new CancellationTokenSource();
this._receiverTask = Task.Run(this.UdpReceiverTask, this.RECEIVER_TOKEN);
}
///
/// Stage2S .
///
/// The voice session description.
/// A Task.
private async Task Stage2(VoiceSessionDescriptionPayload voiceSessionDescription)
{
this._selectedEncryptionMode = Sodium.SupportedModes[voiceSessionDescription.Mode.ToLowerInvariant()];
this._discord.Logger.LogTrace(VoiceNextEvents.VoiceHandshake, "Discord updated encryption mode - new mode is {0}", this._selectedEncryptionMode);
// start keepalive
this._keepaliveTokenSource = new CancellationTokenSource();
this._keepaliveTask = this.KeepaliveAsync();
// send 3 packets of silence to get things going
var nullpcm = new byte[this.AudioFormat.CalculateSampleSize(20)];
for (var i = 0; i < 3; i++)
{
var nullPcm = new byte[nullpcm.Length];
var nullpacketmem = nullPcm.AsMemory();
await this.EnqueuePacketAsync(new RawVoicePacket(nullpacketmem, 20, true)).ConfigureAwait(false);
}
this._isInitialized = true;
this._readyWait.SetResult(true);
}
///
/// Handles the dispatch.
///
/// The jo.
/// A Task.
private async Task HandleDispatch(JObject jo)
{
var opc = (int)jo["op"];
var opp = jo["d"] as JObject;
switch (opc)
{
case 2: // READY
this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received READY (OP2)");
var vrp = opp.ToObject();
this._ssrc = vrp.Ssrc;
this.UdpEndpoint = new ConnectionEndpoint(vrp.Address, vrp.Port);
// this is not the valid interval
// oh, discord
//this.HeartbeatInterval = vrp.HeartbeatInterval;
this._heartbeatTask = Task.Run(this.HeartbeatAsync);
await this.Stage1(vrp).ConfigureAwait(false);
break;
case 4: // SESSION_DESCRIPTION
this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received SESSION_DESCRIPTION (OP4)");
var vsd = opp.ToObject();
this._key = vsd.SecretKey;
this._sodium = new Sodium(this._key.AsMemory());
await this.Stage2(vsd).ConfigureAwait(false);
break;
case 5: // SPEAKING
// Don't spam OP5
// No longer spam, Discord supposedly doesn't send many of these
this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received SPEAKING (OP5)");
var spd = opp.ToObject();
var foundUserInCache = this._discord.TryGetCachedUserInternal(spd.UserId.Value, out var resolvedUser);
var spk = new UserSpeakingEventArgs(this._discord.ServiceProvider)
{
Speaking = spd.Speaking,
Ssrc = spd.Ssrc.Value,
User = resolvedUser,
};
if (foundUserInCache && this._transmittingSsrCs.TryGetValue(spk.Ssrc, out var txssrc5) && txssrc5.Id == 0)
{
txssrc5.User = spk.User;
}
else
{
var opus = this._opus.CreateDecoder();
var vtx = new AudioSender(spk.Ssrc, opus)
{
- User = await this._discord.GetUserAsync(spd.UserId.Value).ConfigureAwait(false)
+ User = await this._discord.GetUserAsync(spd.UserId.Value, true).ConfigureAwait(false)
};
if (!this._transmittingSsrCs.TryAdd(spk.Ssrc, vtx))
this._opus.DestroyDecoder(opus);
}
await this._userSpeaking.InvokeAsync(this, spk).ConfigureAwait(false);
break;
case 6: // HEARTBEAT ACK
var dt = DateTime.Now;
var ping = (int)(dt - this._lastHeartbeat).TotalMilliseconds;
Volatile.Write(ref this._wsPing, ping);
this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received HEARTBEAT_ACK (OP6, {0}ms)", ping);
this._lastHeartbeat = dt;
break;
case 8: // HELLO
// this sends a heartbeat interval that we need to use for heartbeating
this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received HELLO (OP8)");
this._heartbeatInterval = opp["heartbeat_interval"].ToObject();
break;
case 9: // RESUMED
this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received RESUMED (OP9)");
this._heartbeatTask = Task.Run(this.HeartbeatAsync);
break;
case 12: // CLIENT_CONNECTED
this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received CLIENT_CONNECTED (OP12)");
var ujpd = opp.ToObject();
- var usrj = await this._discord.GetUserAsync(ujpd.UserId).ConfigureAwait(false);
+ var usrj = await this._discord.GetUserAsync(ujpd.UserId, true).ConfigureAwait(false);
{
var opus = this._opus.CreateDecoder();
var vtx = new AudioSender(ujpd.Ssrc, opus)
{
User = usrj
};
if (!this._transmittingSsrCs.TryAdd(vtx.Ssrc, vtx))
this._opus.DestroyDecoder(opus);
}
await this._userJoined.InvokeAsync(this, new VoiceUserJoinEventArgs(this._discord.ServiceProvider) { User = usrj, Ssrc = ujpd.Ssrc }).ConfigureAwait(false);
break;
case 13: // CLIENT_DISCONNECTED
this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received CLIENT_DISCONNECTED (OP13)");
var ulpd = opp.ToObject();
var txssrc = this._transmittingSsrCs.FirstOrDefault(x => x.Value.Id == ulpd.UserId);
if (this._transmittingSsrCs.ContainsKey(txssrc.Key))
{
this._transmittingSsrCs.TryRemove(txssrc.Key, out var txssrc13);
this._opus.DestroyDecoder(txssrc13.Decoder);
}
- var usrl = await this._discord.GetUserAsync(ulpd.UserId).ConfigureAwait(false);
+ var usrl = await this._discord.GetUserAsync(ulpd.UserId, true).ConfigureAwait(false);
await this._userLeft.InvokeAsync(this, new VoiceUserLeaveEventArgs(this._discord.ServiceProvider)
{
User = usrl,
Ssrc = txssrc.Key
}).ConfigureAwait(false);
break;
default:
this._discord.Logger.LogTrace(VoiceNextEvents.VoiceDispatch, "Received unknown voice opcode (OP{0})", opc);
break;
}
}
///
/// Voices the w s_ socket closed.
///
/// The client.
/// The e.
/// A Task.
private async Task VoiceWS_SocketClosed(IWebSocketClient client, SocketCloseEventArgs e)
{
this._discord.Logger.LogDebug(VoiceNextEvents.VoiceConnectionClose, "Voice WebSocket closed ({0}, '{1}')", e.CloseCode, e.CloseMessage);
// generally this should not be disposed on all disconnects, only on requested ones
// or something
// otherwise problems happen
//this.Dispose();
if (e.CloseCode == 4006 || e.CloseCode == 4009)
this.Resume = false;
if (!this._isDisposed)
{
this._tokenSource.Cancel();
this._tokenSource = new CancellationTokenSource();
this._voiceWs = this._discord.Configuration.WebSocketClientFactory(this._discord.Configuration.Proxy, this._discord.ServiceProvider);
this._voiceWs.Disconnected += this.VoiceWS_SocketClosed;
this._voiceWs.MessageReceived += this.VoiceWS_SocketMessage;
this._voiceWs.Connected += this.VoiceWS_SocketOpened;
if (this.Resume) // emzi you dipshit
await this.ConnectAsync().ConfigureAwait(false);
}
}
///
/// Voices the w s_ socket message.
///
/// The client.
/// The e.
/// A Task.
private Task VoiceWS_SocketMessage(IWebSocketClient client, SocketMessageEventArgs e)
{
if (e is not SocketTextMessageEventArgs et)
{
this._discord.Logger.LogCritical(VoiceNextEvents.VoiceGatewayError, "Discord Voice Gateway sent binary data - unable to process");
return Task.CompletedTask;
}
this._discord.Logger.LogTrace(VoiceNextEvents.VoiceWsRx, et.Message);
return this.HandleDispatch(JObject.Parse(et.Message));
}
///
/// Voices the w s_ socket opened.
///
/// The client.
/// The e.
/// A Task.
private Task VoiceWS_SocketOpened(IWebSocketClient client, SocketEventArgs e)
=> this.StartAsync();
///
/// Voices the ws_ socket exception.
///
/// The client.
/// The e.
/// A Task.
private Task VoiceWs_SocketException(IWebSocketClient client, SocketErrorEventArgs e)
=> this._voiceSocketError.InvokeAsync(this, new SocketErrorEventArgs(this._discord.ServiceProvider) { Exception = e.Exception });
///
/// Ws the send async.
///
/// The payload.
/// A Task.
private async Task WsSendAsync(string payload)
{
this._discord.Logger.LogTrace(VoiceNextEvents.VoiceWsTx, payload);
await this._voiceWs.SendMessageAsync(payload).ConfigureAwait(false);
}
///
/// Gets the unix timestamp.
///
/// The datetime.
private static uint UnixTimestamp(DateTime dt)
{
var ts = dt - s_unixEpoch;
var sd = ts.TotalSeconds;
var si = (uint)sd;
return si;
}
}