diff --git a/DisCatSharp/Clients/DiscordClient.WebSocket.cs b/DisCatSharp/Clients/DiscordClient.WebSocket.cs index daa68d7ec..aef6535d7 100644 --- a/DisCatSharp/Clients/DiscordClient.WebSocket.cs +++ b/DisCatSharp/Clients/DiscordClient.WebSocket.cs @@ -1,634 +1,634 @@ // This file is part of the DisCatSharp project, based off DSharpPlus. // // Copyright (c) 2021-2023 AITSYS // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. using System; using System.Collections.Concurrent; using System.IO; using System.Linq; using System.Reflection; using System.Threading; using System.Threading.Tasks; using DisCatSharp.Entities; using DisCatSharp.Enums; using DisCatSharp.EventArgs; using DisCatSharp.Net.Abstractions; using DisCatSharp.Net.WebSocket; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using Sentry; namespace DisCatSharp; /// /// Represents a discord websocket client. /// public sealed partial class DiscordClient { #region Private Fields private int _heartbeatInterval; private DateTimeOffset _lastHeartbeat; private Task _heartbeatTask; internal static DateTimeOffset DiscordEpoch = new(2015, 1, 1, 0, 0, 0, TimeSpan.Zero); private int _skippedHeartbeats; private long _lastSequence; internal IWebSocketClient WebSocketClient; private PayloadDecompressor _payloadDecompressor; private CancellationTokenSource _cancelTokenSource; private CancellationToken _cancelToken; #endregion #region Connection Semaphore /// /// Gets the socket locks. /// private static ConcurrentDictionary s_socketLocks { get; } = new(); /// /// Gets the session lock. /// private readonly ManualResetEventSlim _sessionLock = new(true); #endregion #region Internal Connection Methods /// /// Reconnects the websocket client. /// /// Whether to start a new session. /// The reconnect code. /// The reconnect message. private Task InternalReconnectAsync(bool startNewSession = false, int code = 1000, string message = "") { if (startNewSession) this._sessionId = null; _ = this.WebSocketClient.DisconnectAsync(code, message); return Task.CompletedTask; } /// /// Connects the websocket client. /// internal async Task InternalConnectAsync() { using (SentrySdk.Init(o => { o.DetectStartupTime = StartupTimeDetectionMode.Fast; o.DiagnosticLevel = SentryLevel.Debug; o.Environment = "prod"; o.IsGlobalModeEnabled = true; o.TracesSampleRate = 1.0; o.ReportAssembliesMode = ReportAssembliesMode.InformationalVersion; o.Dsn = "https://1da216e26a2741b99e8ccfccea1b7ac8@o1113828.ingest.sentry.io/4504901362515968"; o.AddInAppInclude("DisCatSharp"); o.AttachStacktrace = true; + o.AutoSessionTracking = this.Configuration.EnableSentry; o.StackTraceMode = StackTraceMode.Enhanced; - //o.BeforeSend = sentryEvent => !sentryEvent.Modules.Keys.Contains("DisCatSharp") ? null : sentryEvent; var a = typeof(DiscordClient).GetTypeInfo().Assembly; var vs = ""; var iv = a.GetCustomAttribute(); if (iv != null) vs = iv.InformationalVersion; else { var v = a.GetName().Version; vs = v.ToString(3); } o.Release = $"{this.BotLibrary}@{vs}"; o.SendClientReports = true; })) { if (this.Configuration.EnableSentry) SentrySdk.StartSession(); SocketLock socketLock = null; try { if (this.GatewayInfo == null) await this.InternalUpdateGatewayAsync().ConfigureAwait(false); await this.InitializeAsync().ConfigureAwait(false); socketLock = this.GetSocketLock(); await socketLock.LockAsync().ConfigureAwait(false); SentrySdk.ConfigureScope(o => o.User = new User() { Id = this.CurrentApplication.Id.ToString(), Username = this.CurrentUser.UsernameWithDiscriminator }); //SentrySdk.CaptureMessage($"Testing {DateTime.UtcNow.Ticks}"); } catch { socketLock?.UnlockAfter(TimeSpan.Zero); throw; } if (!this.Presences.ContainsKey(this.CurrentUser.Id)) { this.PresencesInternal[this.CurrentUser.Id] = new DiscordPresence { Discord = this, RawActivity = new TransportActivity(), Activity = new DiscordActivity(), Status = UserStatus.Online, InternalUser = new UserWithIdOnly() { Id = this.CurrentUser.Id } }; } else { var pr = this.PresencesInternal[this.CurrentUser.Id]; pr.RawActivity = new TransportActivity(); pr.Activity = new DiscordActivity(); pr.Status = UserStatus.Online; } Volatile.Write(ref this._skippedHeartbeats, 0); this.WebSocketClient = this.Configuration.WebSocketClientFactory(this.Configuration.Proxy, this.ServiceProvider); this._payloadDecompressor = this.Configuration.GatewayCompressionLevel != GatewayCompressionLevel.None ? new PayloadDecompressor(this.Configuration.GatewayCompressionLevel) : null; this._cancelTokenSource = new CancellationTokenSource(); this._cancelToken = this._cancelTokenSource.Token; this.WebSocketClient.Connected += SocketOnConnect; this.WebSocketClient.Disconnected += SocketOnDisconnect; this.WebSocketClient.MessageReceived += SocketOnMessage; this.WebSocketClient.ExceptionThrown += SocketOnException; var gwuri = new QueryUriBuilder(this.GatewayUri) .AddParameter("v", this.Configuration.ApiVersion) .AddParameter("encoding", "json"); if (this.Configuration.GatewayCompressionLevel == GatewayCompressionLevel.Stream) gwuri.AddParameter("compress", "zlib-stream"); this.Logger.LogDebug(LoggerEvents.Startup, "Connecting to {gw}", this.GatewayUri.AbsoluteUri); await this.WebSocketClient.ConnectAsync(gwuri.Build()).ConfigureAwait(false); Task SocketOnConnect(IWebSocketClient sender, SocketEventArgs e) => this._socketOpened.InvokeAsync(this, e); async Task SocketOnMessage(IWebSocketClient sender, SocketMessageEventArgs e) { string msg = null; if (e is SocketTextMessageEventArgs etext) { msg = etext.Message; } else if (e is SocketBinaryMessageEventArgs ebin) { using var ms = new MemoryStream(); if (!this._payloadDecompressor.TryDecompress(new ArraySegment(ebin.Message), ms)) { this.Logger.LogError(LoggerEvents.WebSocketReceiveFailure, "Payload decompression failed"); return; } ms.Position = 0; using var sr = new StreamReader(ms, Utilities.UTF8); msg = await sr.ReadToEndAsync().ConfigureAwait(false); } try { this.Logger.LogTrace(LoggerEvents.GatewayWsRx, msg); await this.HandleSocketMessageAsync(msg).ConfigureAwait(false); } catch (Exception ex) { this.Logger.LogError(LoggerEvents.WebSocketReceiveFailure, ex, "Socket handler suppressed an exception"); if (this.Configuration.EnableSentry) SentrySdk.CaptureException(ex); } } Task SocketOnException(IWebSocketClient sender, SocketErrorEventArgs e) => this._socketErrored.InvokeAsync(this, e); async Task SocketOnDisconnect(IWebSocketClient sender, SocketCloseEventArgs e) { // release session and connection this._connectionLock.Set(); this._sessionLock.Set(); if (!this._disposed) this._cancelTokenSource.Cancel(); this.Logger.LogDebug(LoggerEvents.ConnectionClose, "Connection closed ({0}, '{1}')", e.CloseCode, e.CloseMessage); await this._socketClosed.InvokeAsync(this, e).ConfigureAwait(false); if (this.Configuration.AutoReconnect && (e.CloseCode < 4001 || e.CloseCode >= 5000)) { this.Logger.LogCritical(LoggerEvents.ConnectionClose, "Connection terminated ({0}, '{1}'), reconnecting", e.CloseCode, e.CloseMessage); if (this._status == null) await this.ConnectAsync().ConfigureAwait(false); else if (this._status.IdleSince.HasValue) await this.ConnectAsync(this._status.ActivityInternal, this._status.Status, Utilities.GetDateTimeOffsetFromMilliseconds(this._status.IdleSince.Value)).ConfigureAwait(false); else await this.ConnectAsync(this._status.ActivityInternal, this._status.Status).ConfigureAwait(false); } else { this.Logger.LogCritical(LoggerEvents.ConnectionClose, "Connection terminated ({0}, '{1}')", e.CloseCode, e.CloseMessage); } } } } #endregion #region WebSocket (Events) /// /// Handles the socket message. /// /// The data. internal async Task HandleSocketMessageAsync(string data) { var payload = JsonConvert.DeserializeObject(data); this._lastSequence = payload.Sequence ?? this._lastSequence; switch (payload.OpCode) { case GatewayOpCode.Dispatch: await Task.Run(async () => await this.HandleDispatchAsync(payload).ConfigureAwait(false)); break; case GatewayOpCode.Heartbeat: await this.OnHeartbeatAsync((long)payload.Data).ConfigureAwait(false); break; case GatewayOpCode.Reconnect: await this.OnReconnectAsync().ConfigureAwait(false); break; case GatewayOpCode.InvalidSession: await this.OnInvalidateSessionAsync((bool)payload.Data).ConfigureAwait(false); break; case GatewayOpCode.Hello: await this.OnHelloAsync((payload.Data as JObject).ToObject()).ConfigureAwait(false); break; case GatewayOpCode.HeartbeatAck: await this.OnHeartbeatAckAsync().ConfigureAwait(false); break; default: this.Logger.LogWarning(LoggerEvents.WebSocketReceive, "Unknown Discord opcode: {0}\nPayload: {1}", payload.OpCode, payload.Data); break; } } /// /// Handles the heartbeat. /// /// The sequence. internal async Task OnHeartbeatAsync(long seq) { this.Logger.LogTrace(LoggerEvents.WebSocketReceive, "Received HEARTBEAT (OP1)"); await this.SendHeartbeatAsync(seq).ConfigureAwait(false); } /// /// Handles the reconnect event. /// internal async Task OnReconnectAsync() { this.Logger.LogTrace(LoggerEvents.WebSocketReceive, "Received RECONNECT (OP7)"); await this.InternalReconnectAsync(code: 4000, message: "OP7 acknowledged").ConfigureAwait(false); } /// /// Handles the invalidate session event /// /// Unknown. Please fill documentation. internal async Task OnInvalidateSessionAsync(bool data) { // begin a session if one is not open already if (this._sessionLock.Wait(0)) this._sessionLock.Reset(); // we are sending a fresh resume/identify, so lock the socket var socketLock = this.GetSocketLock(); await socketLock.LockAsync().ConfigureAwait(false); socketLock.UnlockAfter(TimeSpan.FromSeconds(5)); if (data) { this.Logger.LogTrace(LoggerEvents.WebSocketReceive, "Received INVALID_SESSION (OP9, true)"); await Task.Delay(6000).ConfigureAwait(false); await this.SendResumeAsync().ConfigureAwait(false); } else { this.Logger.LogTrace(LoggerEvents.WebSocketReceive, "Received INVALID_SESSION (OP9, false)"); this._sessionId = null; await this.SendIdentifyAsync(this._status).ConfigureAwait(false); } } /// /// Handles the hello event. /// /// The gateway hello payload. internal async Task OnHelloAsync(GatewayHello hello) { this.Logger.LogTrace(LoggerEvents.WebSocketReceive, "Received HELLO (OP10)"); if (this._sessionLock.Wait(0)) { this._sessionLock.Reset(); this.GetSocketLock().UnlockAfter(TimeSpan.FromSeconds(5)); } else { this.Logger.LogWarning(LoggerEvents.SessionUpdate, "Attempt to start a session while another session is active"); return; } Interlocked.CompareExchange(ref this._skippedHeartbeats, 0, 0); this._heartbeatInterval = hello.HeartbeatInterval; this._heartbeatTask = Task.Run(this.HeartbeatLoopAsync, this._cancelToken); if (string.IsNullOrEmpty(this._sessionId)) await this.SendIdentifyAsync(this._status).ConfigureAwait(false); else await this.SendResumeAsync().ConfigureAwait(false); } /// /// Handles the heartbeat acknowledge event. /// internal async Task OnHeartbeatAckAsync() { Interlocked.Decrement(ref this._skippedHeartbeats); var ping = (int)(DateTime.Now - this._lastHeartbeat).TotalMilliseconds; this.Logger.LogTrace(LoggerEvents.WebSocketReceive, "Received HEARTBEAT_ACK (OP11, {0}ms)", ping); Volatile.Write(ref this._ping, ping); var args = new HeartbeatEventArgs(this.ServiceProvider) { Ping = this.Ping, Timestamp = DateTimeOffset.Now }; await this._heartbeated.InvokeAsync(this, args).ConfigureAwait(false); } /// /// Handles the heartbeat loop. /// internal async Task HeartbeatLoopAsync() { this.Logger.LogDebug(LoggerEvents.Heartbeat, "Heartbeat task started"); var token = this._cancelToken; try { while (true) { await this.SendHeartbeatAsync(this._lastSequence).ConfigureAwait(false); await Task.Delay(this._heartbeatInterval, token).ConfigureAwait(false); token.ThrowIfCancellationRequested(); } } catch (OperationCanceledException) { } } #endregion #region Internal Gateway Methods /// /// Updates the status. /// /// The activity. /// The optional user status. /// Since when is the client performing the specified activity. internal async Task InternalUpdateStatusAsync(DiscordActivity activity, UserStatus? userStatus, DateTimeOffset? idleSince) { if (activity != null && activity.Name != null && activity.Name.Length > 128) throw new Exception("Game name can't be longer than 128 characters!"); var sinceUnix = idleSince != null ? (long?)Utilities.GetUnixTime(idleSince.Value) : null; var act = activity ?? new DiscordActivity(); var status = new StatusUpdate { Activity = new TransportActivity(act), IdleSince = sinceUnix, IsAfk = idleSince != null, Status = userStatus ?? UserStatus.Online }; // Solution to have status persist between sessions this._status = status; var statusUpdate = new GatewayPayload { OpCode = GatewayOpCode.StatusUpdate, Data = status }; var statusstr = JsonConvert.SerializeObject(statusUpdate); await this.WsSendAsync(statusstr).ConfigureAwait(false); if (!this.PresencesInternal.ContainsKey(this.CurrentUser.Id)) { this.PresencesInternal[this.CurrentUser.Id] = new DiscordPresence { Discord = this, Activity = act, Status = userStatus ?? UserStatus.Online, InternalUser = new UserWithIdOnly { Id = this.CurrentUser.Id } }; } else { var pr = this.PresencesInternal[this.CurrentUser.Id]; pr.Activity = act; pr.Status = userStatus ?? pr.Status; } } /// /// Sends the heartbeat. /// /// The sequenze. internal async Task SendHeartbeatAsync(long seq) { var moreThan5 = Volatile.Read(ref this._skippedHeartbeats) > 5; var guildsComp = Volatile.Read(ref this._guildDownloadCompleted); if (guildsComp && moreThan5) { this.Logger.LogCritical(LoggerEvents.HeartbeatFailure, "Server failed to acknowledge more than 5 heartbeats - connection is zombie"); var args = new ZombiedEventArgs(this.ServiceProvider) { Failures = Volatile.Read(ref this._skippedHeartbeats), GuildDownloadCompleted = true }; await this._zombied.InvokeAsync(this, args).ConfigureAwait(false); await this.InternalReconnectAsync(code: 4001, message: "Too many heartbeats missed").ConfigureAwait(false); return; } else if (!guildsComp && moreThan5) { var args = new ZombiedEventArgs(this.ServiceProvider) { Failures = Volatile.Read(ref this._skippedHeartbeats), GuildDownloadCompleted = false }; await this._zombied.InvokeAsync(this, args).ConfigureAwait(false); this.Logger.LogWarning(LoggerEvents.HeartbeatFailure, "Server failed to acknowledge more than 5 heartbeats, but the guild download is still running - check your connection speed"); } Volatile.Write(ref this._lastSequence, seq); this.Logger.LogTrace(LoggerEvents.Heartbeat, "Sending heartbeat"); var heartbeat = new GatewayPayload { OpCode = GatewayOpCode.Heartbeat, Data = seq }; var heartbeatStr = JsonConvert.SerializeObject(heartbeat); await this.WsSendAsync(heartbeatStr).ConfigureAwait(false); this._lastHeartbeat = DateTimeOffset.Now; Interlocked.Increment(ref this._skippedHeartbeats); } /// /// Sends the identify payload. /// /// The status update payload. internal async Task SendIdentifyAsync(StatusUpdate status) { var identify = new GatewayIdentify { Token = Utilities.GetFormattedToken(this), Compress = this.Configuration.GatewayCompressionLevel == GatewayCompressionLevel.Payload, LargeThreshold = this.Configuration.LargeThreshold, ShardInfo = new ShardInfo { ShardId = this.Configuration.ShardId, ShardCount = this.Configuration.ShardCount }, Presence = status, Intents = this.Configuration.Intents, Discord = this }; var payload = new GatewayPayload { OpCode = GatewayOpCode.Identify, Data = identify }; var payloadstr = JsonConvert.SerializeObject(payload); await this.WsSendAsync(payloadstr).ConfigureAwait(false); this.Logger.LogDebug(LoggerEvents.Intents, "Registered gateway intents ({0})", this.Configuration.Intents); } /// /// Sends the resume payload. /// internal async Task SendResumeAsync() { var resume = new GatewayResume { Token = Utilities.GetFormattedToken(this), SessionId = this._sessionId, SequenceNumber = Volatile.Read(ref this._lastSequence) }; var resumePayload = new GatewayPayload { OpCode = GatewayOpCode.Resume, Data = resume }; var resumestr = JsonConvert.SerializeObject(resumePayload); this.GatewayUri = new Uri(this._resumeGatewayUrl); this.Logger.LogDebug(LoggerEvents.ConnectionClose, "Request to resume via {gw}", this.GatewayUri.AbsoluteUri); await this.WsSendAsync(resumestr).ConfigureAwait(false); } /// /// Internals the update gateway async. /// /// A Task. internal async Task InternalUpdateGatewayAsync() { var info = await this.GetGatewayInfoAsync().ConfigureAwait(false); this.GatewayInfo = info; this.GatewayUri = new Uri(info.Url); } /// /// Sends a websocket message. /// /// The payload to send. internal async Task WsSendAsync(string payload) { this.Logger.LogTrace(LoggerEvents.GatewayWsTx, payload); await this.WebSocketClient.SendMessageAsync(payload).ConfigureAwait(false); } #endregion #region Semaphore Methods /// /// Gets the socket lock. /// /// The added socket lock. private SocketLock GetSocketLock() => s_socketLocks.GetOrAdd(this.CurrentApplication.Id, appId => new SocketLock(appId, this.GatewayInfo.SessionBucket.MaxConcurrency)); #endregion } diff --git a/DisCatSharp/Entities/ObservableApiObject.cs b/DisCatSharp/Entities/ObservableApiObject.cs index ba8d36877..a0e1033a8 100644 --- a/DisCatSharp/Entities/ObservableApiObject.cs +++ b/DisCatSharp/Entities/ObservableApiObject.cs @@ -1,52 +1,52 @@ // This file is part of the DisCatSharp project, based off DSharpPlus. // // Copyright (c) 2021-2023 AITSYS // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. using System.Collections.Generic; using Newtonsoft.Json; namespace DisCatSharp.Entities { public abstract class ObservableApiObject { /// /// Gets the client instance this object is tied to. /// [JsonIgnore] internal BaseDiscordClient Discord { get; set; } /// /// Gets additional json properties that are not known to the deserializing object. /// internal IDictionary _unknownProperties = new Dictionary(); /// /// Lets JsonConvert set the unknown properties. /// [JsonExtensionData(ReadData = true, WriteData = false)] - internal IDictionary AdditionalProperties + public IDictionary AdditionalProperties { get => this._unknownProperties; set => this._unknownProperties = value; } } }