all repos — mgba @ a98c01bafc031dc9bfa379b960bc88f1731a62e3

mGBA Game Boy Advance Emulator

src/third-party/discord-rpc/src/discord_rpc.cpp (view raw)

  1#include "discord_rpc.h"
  2
  3#include "backoff.h"
  4#include "discord_register.h"
  5#include "msg_queue.h"
  6#include "rpc_connection.h"
  7#include "serialization.h"
  8
  9#include <atomic>
 10#include <chrono>
 11#include <mutex>
 12
 13#ifndef DISCORD_DISABLE_IO_THREAD
 14#include <condition_variable>
 15#include <thread>
 16#endif
 17
 18constexpr size_t MaxMessageSize{16 * 1024};
 19constexpr size_t MessageQueueSize{8};
 20constexpr size_t JoinQueueSize{8};
 21
 22struct QueuedMessage {
 23    size_t length;
 24    char buffer[MaxMessageSize];
 25
 26    void Copy(const QueuedMessage& other)
 27    {
 28        length = other.length;
 29        if (length) {
 30            memcpy(buffer, other.buffer, length);
 31        }
 32    }
 33};
 34
 35struct User {
 36    // snowflake (64bit int), turned into a ascii decimal string, at most 20 chars +1 null
 37    // terminator = 21
 38    char userId[32];
 39    // 32 unicode glyphs is max name size => 4 bytes per glyph in the worst case, +1 for null
 40    // terminator = 129
 41    char username[344];
 42    // 4 decimal digits + 1 null terminator = 5
 43    char discriminator[8];
 44    // optional 'a_' + md5 hex digest (32 bytes) + null terminator = 35
 45    char avatar[128];
 46    // Rounded way up because I'm paranoid about games breaking from future changes in these sizes
 47};
 48
 49static RpcConnection* Connection{nullptr};
 50static DiscordEventHandlers QueuedHandlers{};
 51static DiscordEventHandlers Handlers{};
 52static std::atomic_bool WasJustConnected{false};
 53static std::atomic_bool WasJustDisconnected{false};
 54static std::atomic_bool GotErrorMessage{false};
 55static std::atomic_bool WasJoinGame{false};
 56static std::atomic_bool WasSpectateGame{false};
 57static std::atomic_bool UpdatePresence{false};
 58static char JoinGameSecret[256];
 59static char SpectateGameSecret[256];
 60static int LastErrorCode{0};
 61static char LastErrorMessage[256];
 62static int LastDisconnectErrorCode{0};
 63static char LastDisconnectErrorMessage[256];
 64static std::mutex PresenceMutex;
 65static std::mutex HandlerMutex;
 66static QueuedMessage QueuedPresence{};
 67static MsgQueue<QueuedMessage, MessageQueueSize> SendQueue;
 68static MsgQueue<User, JoinQueueSize> JoinAskQueue;
 69static User connectedUser;
 70
 71// We want to auto connect, and retry on failure, but not as fast as possible. This does expoential
 72// backoff from 0.5 seconds to 1 minute
 73static Backoff ReconnectTimeMs(500, 60 * 1000);
 74static auto NextConnect = std::chrono::system_clock::now();
 75static int Pid{0};
 76static int Nonce{1};
 77
 78#ifndef DISCORD_DISABLE_IO_THREAD
 79static void Discord_UpdateConnection(void);
 80class IoThreadHolder {
 81private:
 82    std::atomic_bool keepRunning{true};
 83    std::mutex waitForIOMutex;
 84    std::condition_variable waitForIOActivity;
 85    std::thread ioThread;
 86
 87public:
 88    void Start()
 89    {
 90        keepRunning.store(true);
 91        ioThread = std::thread([&]() {
 92            const std::chrono::duration<int64_t, std::milli> maxWait{500LL};
 93            Discord_UpdateConnection();
 94            while (keepRunning.load()) {
 95                std::unique_lock<std::mutex> lock(waitForIOMutex);
 96                waitForIOActivity.wait_for(lock, maxWait);
 97                Discord_UpdateConnection();
 98            }
 99        });
100    }
101
102    void Notify() { waitForIOActivity.notify_all(); }
103
104    void Stop()
105    {
106        keepRunning.exchange(false);
107        Notify();
108        if (ioThread.joinable()) {
109            ioThread.join();
110        }
111    }
112
113    ~IoThreadHolder() { Stop(); }
114};
115#else
116class IoThreadHolder {
117public:
118    void Start() {}
119    void Stop() {}
120    void Notify() {}
121};
122#endif // DISCORD_DISABLE_IO_THREAD
123static IoThreadHolder* IoThread{nullptr};
124
125static void UpdateReconnectTime()
126{
127    NextConnect = std::chrono::system_clock::now() +
128      std::chrono::duration<int64_t, std::milli>{ReconnectTimeMs.nextDelay()};
129}
130
131#ifdef DISCORD_DISABLE_IO_THREAD
132extern "C" DISCORD_EXPORT void Discord_UpdateConnection(void)
133#else
134static void Discord_UpdateConnection(void)
135#endif
136{
137    if (!Connection) {
138        return;
139    }
140
141    if (!Connection->IsOpen()) {
142        if (std::chrono::system_clock::now() >= NextConnect) {
143            UpdateReconnectTime();
144            Connection->Open();
145        }
146    }
147    else {
148        // reads
149
150        for (;;) {
151            JsonDocument message;
152
153            if (!Connection->Read(message)) {
154                break;
155            }
156
157            const char* evtName = GetStrMember(&message, "evt");
158            const char* nonce = GetStrMember(&message, "nonce");
159
160            if (nonce) {
161                // in responses only -- should use to match up response when needed.
162
163                if (evtName && strcmp(evtName, "ERROR") == 0) {
164                    auto data = GetObjMember(&message, "data");
165                    LastErrorCode = GetIntMember(data, "code");
166                    StringCopy(LastErrorMessage, GetStrMember(data, "message", ""));
167                    GotErrorMessage.store(true);
168                }
169            }
170            else {
171                // should have evt == name of event, optional data
172                if (evtName == nullptr) {
173                    continue;
174                }
175
176                auto data = GetObjMember(&message, "data");
177
178                if (strcmp(evtName, "ACTIVITY_JOIN") == 0) {
179                    auto secret = GetStrMember(data, "secret");
180                    if (secret) {
181                        StringCopy(JoinGameSecret, secret);
182                        WasJoinGame.store(true);
183                    }
184                }
185                else if (strcmp(evtName, "ACTIVITY_SPECTATE") == 0) {
186                    auto secret = GetStrMember(data, "secret");
187                    if (secret) {
188                        StringCopy(SpectateGameSecret, secret);
189                        WasSpectateGame.store(true);
190                    }
191                }
192                else if (strcmp(evtName, "ACTIVITY_JOIN_REQUEST") == 0) {
193                    auto user = GetObjMember(data, "user");
194                    auto userId = GetStrMember(user, "id");
195                    auto username = GetStrMember(user, "username");
196                    auto avatar = GetStrMember(user, "avatar");
197                    auto joinReq = JoinAskQueue.GetNextAddMessage();
198                    if (userId && username && joinReq) {
199                        StringCopy(joinReq->userId, userId);
200                        StringCopy(joinReq->username, username);
201                        auto discriminator = GetStrMember(user, "discriminator");
202                        if (discriminator) {
203                            StringCopy(joinReq->discriminator, discriminator);
204                        }
205                        if (avatar) {
206                            StringCopy(joinReq->avatar, avatar);
207                        }
208                        else {
209                            joinReq->avatar[0] = 0;
210                        }
211                        JoinAskQueue.CommitAdd();
212                    }
213                }
214            }
215        }
216
217        // writes
218        if (UpdatePresence.exchange(false) && QueuedPresence.length) {
219            QueuedMessage local;
220            {
221                std::lock_guard<std::mutex> guard(PresenceMutex);
222                local.Copy(QueuedPresence);
223            }
224            if (!Connection->Write(local.buffer, local.length)) {
225                // if we fail to send, requeue
226                std::lock_guard<std::mutex> guard(PresenceMutex);
227                QueuedPresence.Copy(local);
228                UpdatePresence.exchange(true);
229            }
230        }
231
232        while (SendQueue.HavePendingSends()) {
233            auto qmessage = SendQueue.GetNextSendMessage();
234            Connection->Write(qmessage->buffer, qmessage->length);
235            SendQueue.CommitSend();
236        }
237    }
238}
239
240static void SignalIOActivity()
241{
242    if (IoThread != nullptr) {
243        IoThread->Notify();
244    }
245}
246
247static bool RegisterForEvent(const char* evtName)
248{
249    auto qmessage = SendQueue.GetNextAddMessage();
250    if (qmessage) {
251        qmessage->length =
252          JsonWriteSubscribeCommand(qmessage->buffer, sizeof(qmessage->buffer), Nonce++, evtName);
253        SendQueue.CommitAdd();
254        SignalIOActivity();
255        return true;
256    }
257    return false;
258}
259
260static bool DeregisterForEvent(const char* evtName)
261{
262    auto qmessage = SendQueue.GetNextAddMessage();
263    if (qmessage) {
264        qmessage->length =
265          JsonWriteUnsubscribeCommand(qmessage->buffer, sizeof(qmessage->buffer), Nonce++, evtName);
266        SendQueue.CommitAdd();
267        SignalIOActivity();
268        return true;
269    }
270    return false;
271}
272
273extern "C" DISCORD_EXPORT void Discord_Initialize(const char* applicationId,
274                                                  DiscordEventHandlers* handlers,
275                                                  int autoRegister,
276                                                  const char* optionalSteamId)
277{
278    IoThread = new (std::nothrow) IoThreadHolder();
279    if (IoThread == nullptr) {
280        return;
281    }
282
283    if (autoRegister) {
284        if (optionalSteamId && optionalSteamId[0]) {
285            Discord_RegisterSteamGame(applicationId, optionalSteamId);
286        }
287        else {
288            Discord_Register(applicationId, nullptr);
289        }
290    }
291
292    Pid = GetProcessId();
293
294    {
295        std::lock_guard<std::mutex> guard(HandlerMutex);
296
297        if (handlers) {
298            QueuedHandlers = *handlers;
299        }
300        else {
301            QueuedHandlers = {};
302        }
303
304        Handlers = {};
305    }
306
307    if (Connection) {
308        return;
309    }
310
311    Connection = RpcConnection::Create(applicationId);
312    Connection->onConnect = [](JsonDocument& readyMessage) {
313        Discord_UpdateHandlers(&QueuedHandlers);
314        if (QueuedPresence.length > 0) {
315            UpdatePresence.exchange(true);
316            SignalIOActivity();
317        }
318        auto data = GetObjMember(&readyMessage, "data");
319        auto user = GetObjMember(data, "user");
320        auto userId = GetStrMember(user, "id");
321        auto username = GetStrMember(user, "username");
322        auto avatar = GetStrMember(user, "avatar");
323        if (userId && username) {
324            StringCopy(connectedUser.userId, userId);
325            StringCopy(connectedUser.username, username);
326            auto discriminator = GetStrMember(user, "discriminator");
327            if (discriminator) {
328                StringCopy(connectedUser.discriminator, discriminator);
329            }
330            if (avatar) {
331                StringCopy(connectedUser.avatar, avatar);
332            }
333            else {
334                connectedUser.avatar[0] = 0;
335            }
336        }
337        WasJustConnected.exchange(true);
338        ReconnectTimeMs.reset();
339    };
340    Connection->onDisconnect = [](int err, const char* message) {
341        LastDisconnectErrorCode = err;
342        StringCopy(LastDisconnectErrorMessage, message);
343        WasJustDisconnected.exchange(true);
344        UpdateReconnectTime();
345    };
346
347    IoThread->Start();
348}
349
350extern "C" DISCORD_EXPORT void Discord_Shutdown(void)
351{
352    if (!Connection) {
353        return;
354    }
355    Connection->onConnect = nullptr;
356    Connection->onDisconnect = nullptr;
357    Handlers = {};
358    QueuedPresence.length = 0;
359    UpdatePresence.exchange(false);
360    if (IoThread != nullptr) {
361        IoThread->Stop();
362        delete IoThread;
363        IoThread = nullptr;
364    }
365
366    RpcConnection::Destroy(Connection);
367}
368
369extern "C" DISCORD_EXPORT void Discord_UpdatePresence(const DiscordRichPresence* presence)
370{
371    {
372        std::lock_guard<std::mutex> guard(PresenceMutex);
373        QueuedPresence.length = JsonWriteRichPresenceObj(
374          QueuedPresence.buffer, sizeof(QueuedPresence.buffer), Nonce++, Pid, presence);
375        UpdatePresence.exchange(true);
376    }
377    SignalIOActivity();
378}
379
380extern "C" DISCORD_EXPORT void Discord_ClearPresence(void)
381{
382    Discord_UpdatePresence(nullptr);
383}
384
385extern "C" DISCORD_EXPORT void Discord_Respond(const char* userId, /* DISCORD_REPLY_ */ int reply)
386{
387    // if we are not connected, let's not batch up stale messages for later
388    if (!Connection || !Connection->IsOpen()) {
389        return;
390    }
391    auto qmessage = SendQueue.GetNextAddMessage();
392    if (qmessage) {
393        qmessage->length =
394          JsonWriteJoinReply(qmessage->buffer, sizeof(qmessage->buffer), userId, reply, Nonce++);
395        SendQueue.CommitAdd();
396        SignalIOActivity();
397    }
398}
399
400extern "C" DISCORD_EXPORT void Discord_RunCallbacks(void)
401{
402    // Note on some weirdness: internally we might connect, get other signals, disconnect any number
403    // of times inbetween calls here. Externally, we want the sequence to seem sane, so any other
404    // signals are book-ended by calls to ready and disconnect.
405
406    if (!Connection) {
407        return;
408    }
409
410    bool wasDisconnected = WasJustDisconnected.exchange(false);
411    bool isConnected = Connection->IsOpen();
412
413    if (isConnected) {
414        // if we are connected, disconnect cb first
415        std::lock_guard<std::mutex> guard(HandlerMutex);
416        if (wasDisconnected && Handlers.disconnected) {
417            Handlers.disconnected(LastDisconnectErrorCode, LastDisconnectErrorMessage);
418        }
419    }
420
421    if (WasJustConnected.exchange(false)) {
422        std::lock_guard<std::mutex> guard(HandlerMutex);
423        if (Handlers.ready) {
424            DiscordUser du{connectedUser.userId,
425                           connectedUser.username,
426                           connectedUser.discriminator,
427                           connectedUser.avatar};
428            Handlers.ready(&du);
429        }
430    }
431
432    if (GotErrorMessage.exchange(false)) {
433        std::lock_guard<std::mutex> guard(HandlerMutex);
434        if (Handlers.errored) {
435            Handlers.errored(LastErrorCode, LastErrorMessage);
436        }
437    }
438
439    if (WasJoinGame.exchange(false)) {
440        std::lock_guard<std::mutex> guard(HandlerMutex);
441        if (Handlers.joinGame) {
442            Handlers.joinGame(JoinGameSecret);
443        }
444    }
445
446    if (WasSpectateGame.exchange(false)) {
447        std::lock_guard<std::mutex> guard(HandlerMutex);
448        if (Handlers.spectateGame) {
449            Handlers.spectateGame(SpectateGameSecret);
450        }
451    }
452
453    // Right now this batches up any requests and sends them all in a burst; I could imagine a world
454    // where the implementer would rather sequentially accept/reject each one before the next invite
455    // is sent. I left it this way because I could also imagine wanting to process these all and
456    // maybe show them in one common dialog and/or start fetching the avatars in parallel, and if
457    // not it should be trivial for the implementer to make a queue themselves.
458    while (JoinAskQueue.HavePendingSends()) {
459        auto req = JoinAskQueue.GetNextSendMessage();
460        {
461            std::lock_guard<std::mutex> guard(HandlerMutex);
462            if (Handlers.joinRequest) {
463                DiscordUser du{req->userId, req->username, req->discriminator, req->avatar};
464                Handlers.joinRequest(&du);
465            }
466        }
467        JoinAskQueue.CommitSend();
468    }
469
470    if (!isConnected) {
471        // if we are not connected, disconnect message last
472        std::lock_guard<std::mutex> guard(HandlerMutex);
473        if (wasDisconnected && Handlers.disconnected) {
474            Handlers.disconnected(LastDisconnectErrorCode, LastDisconnectErrorMessage);
475        }
476    }
477}
478
479extern "C" DISCORD_EXPORT void Discord_UpdateHandlers(DiscordEventHandlers* newHandlers)
480{
481    if (newHandlers) {
482#define HANDLE_EVENT_REGISTRATION(handler_name, event)              \
483    if (!Handlers.handler_name && newHandlers->handler_name) {      \
484        RegisterForEvent(event);                                    \
485    }                                                               \
486    else if (Handlers.handler_name && !newHandlers->handler_name) { \
487        DeregisterForEvent(event);                                  \
488    }
489
490        std::lock_guard<std::mutex> guard(HandlerMutex);
491        HANDLE_EVENT_REGISTRATION(joinGame, "ACTIVITY_JOIN")
492        HANDLE_EVENT_REGISTRATION(spectateGame, "ACTIVITY_SPECTATE")
493        HANDLE_EVENT_REGISTRATION(joinRequest, "ACTIVITY_JOIN_REQUEST")
494
495#undef HANDLE_EVENT_REGISTRATION
496
497        Handlers = *newHandlers;
498    }
499    else {
500        std::lock_guard<std::mutex> guard(HandlerMutex);
501        Handlers = {};
502    }
503    return;
504}