From 573238f65e70872113acb3e3c79e477b090892f9 Mon Sep 17 00:00:00 2001 From: echo Date: Fri, 3 Apr 2026 05:09:55 +0200 Subject: [PATCH] live sync working --- apps/admin/data/fitai.db | Bin 360448 -> 360448 bytes apps/admin/src/app/chat/page.tsx | 194 ++++++++++++++-------- apps/mobile/src/contexts/ChatContext.tsx | 195 ++++++++++++++++------- 3 files changed, 266 insertions(+), 123 deletions(-) diff --git a/apps/admin/data/fitai.db b/apps/admin/data/fitai.db index cbd7a628ed2d64d2653c423645a5a3a39810f6cf..502567513dae686a0d6b3cf810c02d3cc6251cc2 100644 GIT binary patch delta 1347 zcmZ{k&rj1}7{?2|-PV=8F9*RmM7l&04iMU|KUNYDLUbX9i}9xM-GCN4wiVe5ikZ*@ z7en}Qv3E6`NIVeXBxpPt5_k84@n|sNFHqmI30tSPC(k#}=Xv{mo~Ns;9jmJyTb)g% z?dxMrB`39(YFbM*uccbvrNB!oJdiZY?c-*7Ah|yenC>gGDvEJU#goRIoi`SEq71oR zo1jdTxGX6WP79`HDS}EAYkvy>F3K{e6k38m;e$vya=Z1QwU^ywgW(c$x)*Ev!t^?j zWk-_}-R;yEpr~A71`Dbx3kg5b-Ntm6`|T~@2JYB~ZOqM>Ge!MLF=tJhS^cs|l2ip- z8F5J|dJlJkVfXA1w72=+e!&a4N1Grz5sSq z1GzcM!d(h|N1xCew1y0nM7@ZCKj1Ok-HUM>aOFnJXw(1qlJ(2=CldC_zC~gDa{aJF zM1EA;+&7@0uiR&GpkDTyMtidRdjmw>*JY57vXLPQjldN+92r6%k%K7sA@VD77N z=#ID}CqP8xE^AJVb)6F+kYw$pgsaKA9cTU~YnL%pW!5cGu$ru!exOobKScUG>5d-) zcyQ|k9E~)SKNUv | null>(null); const socketRef = useRef(null); + const getTokenRef = useRef(getToken); + const activeThreadIdRef = useRef(null); + const subscribedThreadIdsRef = useRef>(new Set()); + const loadingThreadsRef = useRef(false); + const loadingMessagesRef = useRef>({}); const activeMessages = useMemo( () => (activeThreadId ? (messagesByThreadId[activeThreadId] ?? []) : []), @@ -69,71 +74,97 @@ export default function AdminChatPage() { ); useEffect(() => { - const loadThreads = async () => { - const token = await getToken(); - if (!token) { - return; - } + getTokenRef.current = getToken; + }, [getToken]); - try { + useEffect(() => { + activeThreadIdRef.current = activeThreadId; + }, [activeThreadId]); + + const loadThreads = async (showLoader = true) => { + if (loadingThreadsRef.current) { + return; + } + + const token = await getTokenRef.current(); + if (!token) { + return; + } + + loadingThreadsRef.current = true; + try { + if (showLoader) { setLoadingThreads(true); - const response = await axios.get<{ threads: ChatThread[] }>( - "/api/chat/threads", - { - headers: { Authorization: `Bearer ${token}` }, - }, - ); + } + const response = await axios.get<{ threads: ChatThread[] }>( + "/api/chat/threads", + { + headers: { Authorization: `Bearer ${token}` }, + }, + ); - setThreads(response.data.threads); - setActiveThreadId( - (prev) => prev ?? response.data.threads[0]?.id ?? null, - ); - setReadByThreadId({}); - } finally { + setThreads(response.data.threads); + setActiveThreadId((prev) => prev ?? response.data.threads[0]?.id ?? null); + setReadByThreadId({}); + } finally { + if (showLoader) { setLoadingThreads(false); } - }; + loadingThreadsRef.current = false; + } + }; - void loadThreads(); - }, [getToken]); + const loadMessages = async (threadId: string, showLoader = true) => { + if (loadingMessagesRef.current[threadId]) { + return; + } + + const token = await getTokenRef.current(); + if (!token) { + return; + } + + loadingMessagesRef.current[threadId] = true; + try { + if (showLoader) { + setLoadingMessages(true); + } + const response = await axios.get<{ messages: ChatMessage[] }>( + `/api/chat/threads/${threadId}/messages`, + { + headers: { Authorization: `Bearer ${token}` }, + }, + ); + + setMessagesByThreadId((prev) => ({ + ...prev, + [threadId]: response.data.messages, + })); + } finally { + if (showLoader) { + setLoadingMessages(false); + } + loadingMessagesRef.current[threadId] = false; + } + }; + + useEffect(() => { + void loadThreads(true); + }, []); useEffect(() => { if (!activeThreadId) { return; } - const loadMessages = async () => { - const token = await getToken(); - if (!token) { - return; - } - - try { - setLoadingMessages(true); - const response = await axios.get<{ messages: ChatMessage[] }>( - `/api/chat/threads/${activeThreadId}/messages`, - { - headers: { Authorization: `Bearer ${token}` }, - }, - ); - - setMessagesByThreadId((prev) => ({ - ...prev, - [activeThreadId]: response.data.messages, - })); - } finally { - setLoadingMessages(false); - } - }; - - void loadMessages(); - }, [activeThreadId, getToken]); + void loadMessages(activeThreadId, true); + }, [activeThreadId]); useEffect(() => { let mounted = true; const setupSocket = async () => { - const token = await getToken(); + const token = await getTokenRef.current(); if (!token || !mounted) { return; } @@ -151,14 +182,14 @@ export default function AdminChatPage() { socket.on("connect", () => { setSocketConnected(true); - if (activeThreadId) { - socket.emit("chat:subscribe", { threadId: activeThreadId }); - } + subscribedThreadIdsRef.current.clear(); + void loadThreads(false); }); socket.on("disconnect", () => { setSocketConnected(false); setTypingByThreadId({}); + subscribedThreadIdsRef.current.clear(); }); socket.on( @@ -185,7 +216,7 @@ export default function AdminChatPage() { lastMessageAt: event.message.createdAt, lastMessageBody: event.message.body, unreadCount: - event.threadId === activeThreadId || + event.threadId === activeThreadIdRef.current || event.message.senderUserId === userId ? thread.unreadCount : thread.unreadCount + 1, @@ -263,6 +294,8 @@ export default function AdminChatPage() { [event.userId!]: event.lastReadMessageId!, }, })); + + void loadThreads(false); }); socket.on("chat:error", (event: { code?: string; message?: string }) => { @@ -284,28 +317,57 @@ export default function AdminChatPage() { socketRef.current = null; } }; - }, [activeThreadId, getToken, userId]); + }, [userId]); useEffect(() => { - if (!activeThreadId || !socketRef.current || !socketConnected) { + if (!socketRef.current || !socketConnected) { + return; + } + + const socket = socketRef.current; + const nextIds = new Set(threads.map((thread) => thread.id)); + + threads.forEach((thread) => { + if (!subscribedThreadIdsRef.current.has(thread.id)) { + socket.emit("chat:subscribe", { threadId: thread.id }); + subscribedThreadIdsRef.current.add(thread.id); + } + }); + + Array.from(subscribedThreadIdsRef.current).forEach((threadId) => { + if (!nextIds.has(threadId)) { + socket.emit("chat:unsubscribe", { threadId }); + subscribedThreadIdsRef.current.delete(threadId); + } + }); + }, [socketConnected, threads]); + + useEffect(() => { + if (!activeThreadId) { return; } - socketRef.current.emit("chat:subscribe", { threadId: activeThreadId }); setThreads((prev) => prev.map((thread) => thread.id === activeThreadId ? { ...thread, unreadCount: 0 } : thread, ), ); + }, [activeThreadId]); - return () => { - if (socketRef.current) { - socketRef.current.emit("chat:unsubscribe", { - threadId: activeThreadId, - }); - } - }; - }, [activeThreadId, socketConnected]); + useEffect(() => { + if (!socketConnected) { + const interval = setInterval(() => { + void loadThreads(false); + if (activeThreadIdRef.current) { + void loadMessages(activeThreadIdRef.current, false); + } + }, 5000); + + return () => { + clearInterval(interval); + }; + } + }, [socketConnected]); useEffect(() => { if (!activeThreadId) { @@ -319,7 +381,7 @@ export default function AdminChatPage() { } const markRead = async () => { - const token = await getToken(); + const token = await getTokenRef.current(); if (!token) { return; } @@ -351,7 +413,7 @@ export default function AdminChatPage() { }; void markRead(); - }, [activeThreadId, getToken, messagesByThreadId, socketConnected, userId]); + }, [activeThreadId, messagesByThreadId, socketConnected, userId]); const onChangeDraft = (value: string) => { setDraft(value); @@ -419,7 +481,7 @@ export default function AdminChatPage() { return; } - const token = await getToken(); + const token = await getTokenRef.current(); if (!token) { return; } diff --git a/apps/mobile/src/contexts/ChatContext.tsx b/apps/mobile/src/contexts/ChatContext.tsx index 0d39fad..1940070 100644 --- a/apps/mobile/src/contexts/ChatContext.tsx +++ b/apps/mobile/src/contexts/ChatContext.tsx @@ -74,16 +74,26 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { >({}); const socketRef = useRef(null); + const getTokenRef = useRef(getToken); const lastMarkedReadByThreadRef = useRef>({}); const activeThreadIdRef = useRef(null); const currentUserIdRef = useRef(undefined); - const refreshThreadsRef = useRef<() => Promise>(async () => {}); - const refreshMessagesRef = useRef<(threadId: string) => Promise>( + const subscribedThreadIdsRef = useRef>(new Set()); + const refreshThreadsRef = useRef<(showLoader?: boolean) => Promise>( async () => {}, ); + const refreshMessagesRef = useRef< + (threadId: string, showLoader?: boolean) => Promise + >(async () => {}); const markThreadReadRef = useRef< (threadId: string, lastReadMessageId?: string) => Promise >(async () => {}); + const refreshingThreadsRef = useRef(false); + const refreshingMessagesRef = useRef>({}); + + useEffect(() => { + getTokenRef.current = getToken; + }, [getToken]); const clearAll = useCallback(() => { setThreads([]); @@ -97,6 +107,7 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { setSocketConnected(false); setTypingByThreadId({}); lastMarkedReadByThreadRef.current = {}; + subscribedThreadIdsRef.current.clear(); if (socketRef.current) { socketRef.current.disconnect(); @@ -104,56 +115,80 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { } }, []); - const refreshThreads = useCallback(async () => { - if (!isSignedIn) { - return; - } - - const token = await getToken(); - if (!token) { - return; - } - - try { - setLoadingThreads(true); - if (isClientUser) { - try { - await chatApi.getMyDmThread(token); - } catch { - // ignore bootstrap errors for now - } - } - - const data = await chatApi.getThreads(token); - setThreads(data); - - setActiveThreadId((prev) => { - if (prev && data.some((thread) => thread.id === prev)) { - return prev; - } - - return data.length > 0 ? data[0].id : null; - }); - } catch (error) { - log.warn("Failed to refresh chat threads", { error }); - } finally { - setLoadingThreads(false); - } - }, [getToken, isSignedIn, isClientUser]); - - const refreshMessages = useCallback( - async (threadId: string) => { + const refreshThreads = useCallback( + async (showLoader = true) => { if (!isSignedIn) { return; } - const token = await getToken(); + if (refreshingThreadsRef.current) { + return; + } + + refreshingThreadsRef.current = true; + + const token = await getTokenRef.current(); if (!token) { + refreshingThreadsRef.current = false; return; } try { - setLoadingMessages(true); + if (showLoader) { + setLoadingThreads(true); + } + if (isClientUser) { + try { + await chatApi.getMyDmThread(token); + } catch { + // ignore bootstrap errors for now + } + } + + const data = await chatApi.getThreads(token); + setThreads(data); + + setActiveThreadId((prev) => { + if (prev && data.some((thread) => thread.id === prev)) { + return prev; + } + + return data.length > 0 ? data[0].id : null; + }); + } catch (error) { + log.warn("Failed to refresh chat threads", { error }); + } finally { + if (showLoader) { + setLoadingThreads(false); + } + refreshingThreadsRef.current = false; + } + }, + [isSignedIn, isClientUser], + ); + + const refreshMessages = useCallback( + async (threadId: string, showLoader = true) => { + if (!isSignedIn) { + return; + } + + if (refreshingMessagesRef.current[threadId]) { + return; + } + + refreshingMessagesRef.current[threadId] = true; + + const token = await getTokenRef.current(); + if (!token) { + refreshingMessagesRef.current[threadId] = false; + return; + } + + try { + if (showLoader) { + setLoadingMessages(true); + } const response = await chatApi.getThreadMessages(threadId, token); setMessagesByThreadId((prev) => ({ ...prev, @@ -172,10 +207,13 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { } catch (error) { log.warn("Failed to refresh chat messages", { threadId, error }); } finally { - setLoadingMessages(false); + if (showLoader) { + setLoadingMessages(false); + } + refreshingMessagesRef.current[threadId] = false; } }, - [getToken, isSignedIn], + [isSignedIn], ); const loadOlderMessages = useCallback( @@ -193,7 +231,7 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { return; } - const token = await getToken(); + const token = await getTokenRef.current(); if (!token) { return; } @@ -235,7 +273,7 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { setLoadingOlderByThreadId((prev) => ({ ...prev, [threadId]: false })); } }, - [getToken, isSignedIn, loadingOlderByThreadId, nextCursorByThreadId], + [isSignedIn, loadingOlderByThreadId, nextCursorByThreadId], ); const markThreadRead = useCallback( @@ -244,7 +282,7 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { return; } - const token = await getToken(); + const token = await getTokenRef.current(); if (!token) { return; } @@ -271,7 +309,7 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { log.warn("Failed to mark thread read", { threadId, error }); } }, - [getToken, isSignedIn, user?.id], + [isSignedIn, user?.id], ); useEffect(() => { @@ -301,7 +339,7 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { return; } - const token = await getToken(); + const token = await getTokenRef.current(); if (!token) { return; } @@ -360,7 +398,7 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { throw error; } }, - [getToken, isSignedIn, socketConnected, user?.id], + [isSignedIn, socketConnected, user?.id], ); useEffect(() => { @@ -372,7 +410,7 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { let mounted = true; const setupSocket = async () => { - const token = await getToken(); + const token = await getTokenRef.current(); if (!token || !mounted) { return; } @@ -382,7 +420,7 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { socket.on("connect", () => { setSocketConnected(true); - void refreshThreadsRef.current(); + void refreshThreadsRef.current(false); if (activeThreadIdRef.current) { socket.emit("chat:subscribe", { @@ -394,6 +432,7 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { socket.on("disconnect", () => { setSocketConnected(false); setTypingByThreadId({}); + subscribedThreadIdsRef.current.clear(); }); socket.on( @@ -460,6 +499,8 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { : thread, ), ); + } else if (event.threadId !== activeThreadIdRef.current) { + void refreshThreadsRef.current(false); } }, ); @@ -523,7 +564,7 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { }, })); - void refreshThreadsRef.current(); + void refreshThreadsRef.current(false); }, ); @@ -575,14 +616,14 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { socketRef.current = null; } }; - }, [clearAll, getToken, isSignedIn]); + }, [clearAll, isSignedIn]); useEffect(() => { if (!isSignedIn) { return; } - void refreshThreadsRef.current(); + void refreshThreadsRef.current(true); }, [isSignedIn, userRole]); useEffect(() => { @@ -594,7 +635,7 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { socketRef.current.emit("chat:subscribe", { threadId: activeThreadId }); } - void refreshMessagesRef.current(activeThreadId); + void refreshMessagesRef.current(activeThreadId, true); setThreads((prev) => prev.map((thread) => @@ -611,6 +652,46 @@ export function ChatProvider({ children }: { children: React.ReactNode }) { }; }, [activeThreadId, socketConnected]); + useEffect(() => { + if (!socketRef.current || !socketConnected) { + return; + } + + const socket = socketRef.current; + const nextIds = new Set(threads.map((thread) => thread.id)); + + threads.forEach((thread) => { + if (!subscribedThreadIdsRef.current.has(thread.id)) { + socket.emit("chat:subscribe", { threadId: thread.id }); + subscribedThreadIdsRef.current.add(thread.id); + } + }); + + Array.from(subscribedThreadIdsRef.current).forEach((threadId) => { + if (!nextIds.has(threadId)) { + socket.emit("chat:unsubscribe", { threadId }); + subscribedThreadIdsRef.current.delete(threadId); + } + }); + }, [socketConnected, threads]); + + useEffect(() => { + if (!isSignedIn || socketConnected) { + return; + } + + const interval = setInterval(() => { + void refreshThreadsRef.current(false); + if (activeThreadIdRef.current) { + void refreshMessagesRef.current(activeThreadIdRef.current, false); + } + }, 5000); + + return () => { + clearInterval(interval); + }; + }, [isSignedIn, socketConnected]); + useEffect(() => { if (!activeThreadId) { return;