src/third-party/discord-rpc/src/discord_rpc.cpp (view raw)
1#include "discord_rpc.h"
2
3#include "backoff.h"
4#include "discord_register.h"
5#include "msg_queue.h"
6#include "rpc_connection.h"
7#include "serialization.h"
8
9#include <atomic>
10#include <chrono>
11#include <mutex>
12
13#ifndef DISCORD_DISABLE_IO_THREAD
14#include <condition_variable>
15#include <thread>
16#endif
17
18constexpr size_t MaxMessageSize{16 * 1024};
19constexpr size_t MessageQueueSize{8};
20constexpr size_t JoinQueueSize{8};
21
22struct QueuedMessage {
23 size_t length;
24 char buffer[MaxMessageSize];
25
26 void Copy(const QueuedMessage& other)
27 {
28 length = other.length;
29 if (length) {
30 memcpy(buffer, other.buffer, length);
31 }
32 }
33};
34
35struct User {
36 // snowflake (64bit int), turned into a ascii decimal string, at most 20 chars +1 null
37 // terminator = 21
38 char userId[32];
39 // 32 unicode glyphs is max name size => 4 bytes per glyph in the worst case, +1 for null
40 // terminator = 129
41 char username[344];
42 // 4 decimal digits + 1 null terminator = 5
43 char discriminator[8];
44 // optional 'a_' + md5 hex digest (32 bytes) + null terminator = 35
45 char avatar[128];
46 // Rounded way up because I'm paranoid about games breaking from future changes in these sizes
47};
48
49static RpcConnection* Connection{nullptr};
50static DiscordEventHandlers QueuedHandlers{};
51static DiscordEventHandlers Handlers{};
52static std::atomic_bool WasJustConnected{false};
53static std::atomic_bool WasJustDisconnected{false};
54static std::atomic_bool GotErrorMessage{false};
55static std::atomic_bool WasJoinGame{false};
56static std::atomic_bool WasSpectateGame{false};
57static std::atomic_bool UpdatePresence{false};
58static char JoinGameSecret[256];
59static char SpectateGameSecret[256];
60static int LastErrorCode{0};
61static char LastErrorMessage[256];
62static int LastDisconnectErrorCode{0};
63static char LastDisconnectErrorMessage[256];
64static std::mutex PresenceMutex;
65static std::mutex HandlerMutex;
66static QueuedMessage QueuedPresence{};
67static MsgQueue<QueuedMessage, MessageQueueSize> SendQueue;
68static MsgQueue<User, JoinQueueSize> JoinAskQueue;
69static User connectedUser;
70
71// We want to auto connect, and retry on failure, but not as fast as possible. This does expoential
72// backoff from 0.5 seconds to 1 minute
73static Backoff ReconnectTimeMs(500, 60 * 1000);
74static auto NextConnect = std::chrono::system_clock::now();
75static int Pid{0};
76static int Nonce{1};
77
78#ifndef DISCORD_DISABLE_IO_THREAD
79static void Discord_UpdateConnection(void);
80class IoThreadHolder {
81private:
82 std::atomic_bool keepRunning{true};
83 std::mutex waitForIOMutex;
84 std::condition_variable waitForIOActivity;
85 std::thread ioThread;
86
87public:
88 void Start()
89 {
90 keepRunning.store(true);
91 ioThread = std::thread([&]() {
92 const std::chrono::duration<int64_t, std::milli> maxWait{500LL};
93 Discord_UpdateConnection();
94 while (keepRunning.load()) {
95 std::unique_lock<std::mutex> lock(waitForIOMutex);
96 waitForIOActivity.wait_for(lock, maxWait);
97 Discord_UpdateConnection();
98 }
99 });
100 }
101
102 void Notify() { waitForIOActivity.notify_all(); }
103
104 void Stop()
105 {
106 keepRunning.exchange(false);
107 Notify();
108 if (ioThread.joinable()) {
109 ioThread.join();
110 }
111 }
112
113 ~IoThreadHolder() { Stop(); }
114};
115#else
116class IoThreadHolder {
117public:
118 void Start() {}
119 void Stop() {}
120 void Notify() {}
121};
122#endif // DISCORD_DISABLE_IO_THREAD
123static IoThreadHolder* IoThread{nullptr};
124
125static void UpdateReconnectTime()
126{
127 NextConnect = std::chrono::system_clock::now() +
128 std::chrono::duration<int64_t, std::milli>{ReconnectTimeMs.nextDelay()};
129}
130
131#ifdef DISCORD_DISABLE_IO_THREAD
132extern "C" DISCORD_EXPORT void Discord_UpdateConnection(void)
133#else
134static void Discord_UpdateConnection(void)
135#endif
136{
137 if (!Connection) {
138 return;
139 }
140
141 if (!Connection->IsOpen()) {
142 if (std::chrono::system_clock::now() >= NextConnect) {
143 UpdateReconnectTime();
144 Connection->Open();
145 }
146 }
147 else {
148 // reads
149
150 for (;;) {
151 JsonDocument message;
152
153 if (!Connection->Read(message)) {
154 break;
155 }
156
157 const char* evtName = GetStrMember(&message, "evt");
158 const char* nonce = GetStrMember(&message, "nonce");
159
160 if (nonce) {
161 // in responses only -- should use to match up response when needed.
162
163 if (evtName && strcmp(evtName, "ERROR") == 0) {
164 auto data = GetObjMember(&message, "data");
165 LastErrorCode = GetIntMember(data, "code");
166 StringCopy(LastErrorMessage, GetStrMember(data, "message", ""));
167 GotErrorMessage.store(true);
168 }
169 }
170 else {
171 // should have evt == name of event, optional data
172 if (evtName == nullptr) {
173 continue;
174 }
175
176 auto data = GetObjMember(&message, "data");
177
178 if (strcmp(evtName, "ACTIVITY_JOIN") == 0) {
179 auto secret = GetStrMember(data, "secret");
180 if (secret) {
181 StringCopy(JoinGameSecret, secret);
182 WasJoinGame.store(true);
183 }
184 }
185 else if (strcmp(evtName, "ACTIVITY_SPECTATE") == 0) {
186 auto secret = GetStrMember(data, "secret");
187 if (secret) {
188 StringCopy(SpectateGameSecret, secret);
189 WasSpectateGame.store(true);
190 }
191 }
192 else if (strcmp(evtName, "ACTIVITY_JOIN_REQUEST") == 0) {
193 auto user = GetObjMember(data, "user");
194 auto userId = GetStrMember(user, "id");
195 auto username = GetStrMember(user, "username");
196 auto avatar = GetStrMember(user, "avatar");
197 auto joinReq = JoinAskQueue.GetNextAddMessage();
198 if (userId && username && joinReq) {
199 StringCopy(joinReq->userId, userId);
200 StringCopy(joinReq->username, username);
201 auto discriminator = GetStrMember(user, "discriminator");
202 if (discriminator) {
203 StringCopy(joinReq->discriminator, discriminator);
204 }
205 if (avatar) {
206 StringCopy(joinReq->avatar, avatar);
207 }
208 else {
209 joinReq->avatar[0] = 0;
210 }
211 JoinAskQueue.CommitAdd();
212 }
213 }
214 }
215 }
216
217 // writes
218 if (UpdatePresence.exchange(false) && QueuedPresence.length) {
219 QueuedMessage local;
220 {
221 std::lock_guard<std::mutex> guard(PresenceMutex);
222 local.Copy(QueuedPresence);
223 }
224 if (!Connection->Write(local.buffer, local.length)) {
225 // if we fail to send, requeue
226 std::lock_guard<std::mutex> guard(PresenceMutex);
227 QueuedPresence.Copy(local);
228 UpdatePresence.exchange(true);
229 }
230 }
231
232 while (SendQueue.HavePendingSends()) {
233 auto qmessage = SendQueue.GetNextSendMessage();
234 Connection->Write(qmessage->buffer, qmessage->length);
235 SendQueue.CommitSend();
236 }
237 }
238}
239
240static void SignalIOActivity()
241{
242 if (IoThread != nullptr) {
243 IoThread->Notify();
244 }
245}
246
247static bool RegisterForEvent(const char* evtName)
248{
249 auto qmessage = SendQueue.GetNextAddMessage();
250 if (qmessage) {
251 qmessage->length =
252 JsonWriteSubscribeCommand(qmessage->buffer, sizeof(qmessage->buffer), Nonce++, evtName);
253 SendQueue.CommitAdd();
254 SignalIOActivity();
255 return true;
256 }
257 return false;
258}
259
260static bool DeregisterForEvent(const char* evtName)
261{
262 auto qmessage = SendQueue.GetNextAddMessage();
263 if (qmessage) {
264 qmessage->length =
265 JsonWriteUnsubscribeCommand(qmessage->buffer, sizeof(qmessage->buffer), Nonce++, evtName);
266 SendQueue.CommitAdd();
267 SignalIOActivity();
268 return true;
269 }
270 return false;
271}
272
273extern "C" DISCORD_EXPORT void Discord_Initialize(const char* applicationId,
274 DiscordEventHandlers* handlers,
275 int autoRegister,
276 const char* optionalSteamId)
277{
278 IoThread = new (std::nothrow) IoThreadHolder();
279 if (IoThread == nullptr) {
280 return;
281 }
282
283 if (autoRegister) {
284 if (optionalSteamId && optionalSteamId[0]) {
285 Discord_RegisterSteamGame(applicationId, optionalSteamId);
286 }
287 else {
288 Discord_Register(applicationId, nullptr);
289 }
290 }
291
292 Pid = GetProcessId();
293
294 {
295 std::lock_guard<std::mutex> guard(HandlerMutex);
296
297 if (handlers) {
298 QueuedHandlers = *handlers;
299 }
300 else {
301 QueuedHandlers = {};
302 }
303
304 Handlers = {};
305 }
306
307 if (Connection) {
308 return;
309 }
310
311 Connection = RpcConnection::Create(applicationId);
312 Connection->onConnect = [](JsonDocument& readyMessage) {
313 Discord_UpdateHandlers(&QueuedHandlers);
314 if (QueuedPresence.length > 0) {
315 UpdatePresence.exchange(true);
316 SignalIOActivity();
317 }
318 auto data = GetObjMember(&readyMessage, "data");
319 auto user = GetObjMember(data, "user");
320 auto userId = GetStrMember(user, "id");
321 auto username = GetStrMember(user, "username");
322 auto avatar = GetStrMember(user, "avatar");
323 if (userId && username) {
324 StringCopy(connectedUser.userId, userId);
325 StringCopy(connectedUser.username, username);
326 auto discriminator = GetStrMember(user, "discriminator");
327 if (discriminator) {
328 StringCopy(connectedUser.discriminator, discriminator);
329 }
330 if (avatar) {
331 StringCopy(connectedUser.avatar, avatar);
332 }
333 else {
334 connectedUser.avatar[0] = 0;
335 }
336 }
337 WasJustConnected.exchange(true);
338 ReconnectTimeMs.reset();
339 };
340 Connection->onDisconnect = [](int err, const char* message) {
341 LastDisconnectErrorCode = err;
342 StringCopy(LastDisconnectErrorMessage, message);
343 WasJustDisconnected.exchange(true);
344 UpdateReconnectTime();
345 };
346
347 IoThread->Start();
348}
349
350extern "C" DISCORD_EXPORT void Discord_Shutdown(void)
351{
352 if (!Connection) {
353 return;
354 }
355 Connection->onConnect = nullptr;
356 Connection->onDisconnect = nullptr;
357 Handlers = {};
358 QueuedPresence.length = 0;
359 UpdatePresence.exchange(false);
360 if (IoThread != nullptr) {
361 IoThread->Stop();
362 delete IoThread;
363 IoThread = nullptr;
364 }
365
366 RpcConnection::Destroy(Connection);
367}
368
369extern "C" DISCORD_EXPORT void Discord_UpdatePresence(const DiscordRichPresence* presence)
370{
371 {
372 std::lock_guard<std::mutex> guard(PresenceMutex);
373 QueuedPresence.length = JsonWriteRichPresenceObj(
374 QueuedPresence.buffer, sizeof(QueuedPresence.buffer), Nonce++, Pid, presence);
375 UpdatePresence.exchange(true);
376 }
377 SignalIOActivity();
378}
379
380extern "C" DISCORD_EXPORT void Discord_ClearPresence(void)
381{
382 Discord_UpdatePresence(nullptr);
383}
384
385extern "C" DISCORD_EXPORT void Discord_Respond(const char* userId, /* DISCORD_REPLY_ */ int reply)
386{
387 // if we are not connected, let's not batch up stale messages for later
388 if (!Connection || !Connection->IsOpen()) {
389 return;
390 }
391 auto qmessage = SendQueue.GetNextAddMessage();
392 if (qmessage) {
393 qmessage->length =
394 JsonWriteJoinReply(qmessage->buffer, sizeof(qmessage->buffer), userId, reply, Nonce++);
395 SendQueue.CommitAdd();
396 SignalIOActivity();
397 }
398}
399
400extern "C" DISCORD_EXPORT void Discord_RunCallbacks(void)
401{
402 // Note on some weirdness: internally we might connect, get other signals, disconnect any number
403 // of times inbetween calls here. Externally, we want the sequence to seem sane, so any other
404 // signals are book-ended by calls to ready and disconnect.
405
406 if (!Connection) {
407 return;
408 }
409
410 bool wasDisconnected = WasJustDisconnected.exchange(false);
411 bool isConnected = Connection->IsOpen();
412
413 if (isConnected) {
414 // if we are connected, disconnect cb first
415 std::lock_guard<std::mutex> guard(HandlerMutex);
416 if (wasDisconnected && Handlers.disconnected) {
417 Handlers.disconnected(LastDisconnectErrorCode, LastDisconnectErrorMessage);
418 }
419 }
420
421 if (WasJustConnected.exchange(false)) {
422 std::lock_guard<std::mutex> guard(HandlerMutex);
423 if (Handlers.ready) {
424 DiscordUser du{connectedUser.userId,
425 connectedUser.username,
426 connectedUser.discriminator,
427 connectedUser.avatar};
428 Handlers.ready(&du);
429 }
430 }
431
432 if (GotErrorMessage.exchange(false)) {
433 std::lock_guard<std::mutex> guard(HandlerMutex);
434 if (Handlers.errored) {
435 Handlers.errored(LastErrorCode, LastErrorMessage);
436 }
437 }
438
439 if (WasJoinGame.exchange(false)) {
440 std::lock_guard<std::mutex> guard(HandlerMutex);
441 if (Handlers.joinGame) {
442 Handlers.joinGame(JoinGameSecret);
443 }
444 }
445
446 if (WasSpectateGame.exchange(false)) {
447 std::lock_guard<std::mutex> guard(HandlerMutex);
448 if (Handlers.spectateGame) {
449 Handlers.spectateGame(SpectateGameSecret);
450 }
451 }
452
453 // Right now this batches up any requests and sends them all in a burst; I could imagine a world
454 // where the implementer would rather sequentially accept/reject each one before the next invite
455 // is sent. I left it this way because I could also imagine wanting to process these all and
456 // maybe show them in one common dialog and/or start fetching the avatars in parallel, and if
457 // not it should be trivial for the implementer to make a queue themselves.
458 while (JoinAskQueue.HavePendingSends()) {
459 auto req = JoinAskQueue.GetNextSendMessage();
460 {
461 std::lock_guard<std::mutex> guard(HandlerMutex);
462 if (Handlers.joinRequest) {
463 DiscordUser du{req->userId, req->username, req->discriminator, req->avatar};
464 Handlers.joinRequest(&du);
465 }
466 }
467 JoinAskQueue.CommitSend();
468 }
469
470 if (!isConnected) {
471 // if we are not connected, disconnect message last
472 std::lock_guard<std::mutex> guard(HandlerMutex);
473 if (wasDisconnected && Handlers.disconnected) {
474 Handlers.disconnected(LastDisconnectErrorCode, LastDisconnectErrorMessage);
475 }
476 }
477}
478
479extern "C" DISCORD_EXPORT void Discord_UpdateHandlers(DiscordEventHandlers* newHandlers)
480{
481 if (newHandlers) {
482#define HANDLE_EVENT_REGISTRATION(handler_name, event) \
483 if (!Handlers.handler_name && newHandlers->handler_name) { \
484 RegisterForEvent(event); \
485 } \
486 else if (Handlers.handler_name && !newHandlers->handler_name) { \
487 DeregisterForEvent(event); \
488 }
489
490 std::lock_guard<std::mutex> guard(HandlerMutex);
491 HANDLE_EVENT_REGISTRATION(joinGame, "ACTIVITY_JOIN")
492 HANDLE_EVENT_REGISTRATION(spectateGame, "ACTIVITY_SPECTATE")
493 HANDLE_EVENT_REGISTRATION(joinRequest, "ACTIVITY_JOIN_REQUEST")
494
495#undef HANDLE_EVENT_REGISTRATION
496
497 Handlers = *newHandlers;
498 }
499 else {
500 std::lock_guard<std::mutex> guard(HandlerMutex);
501 Handlers = {};
502 }
503 return;
504}