import { Injectable, NotFoundException, Logger, OnModuleInit, Inject, forwardRef, } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository, In } from 'typeorm'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { Response } from 'express'; import { LiveBlog, LiveBlogUpdate, LiveBlogStatus } from './entities'; import { StrapiService } from './strapi.service'; import { CreateLiveBlogDto, UpdateLiveBlogDto, FindLiveBlogsDto, CreateLiveBlogUpdateDto, UpdateLiveBlogUpdateDto, } from './articles.dto'; interface SseClient { id: string; response: Response; blogId: string; } interface LiveBlogUpdateEvent { blogId: string; update: LiveBlogUpdate; } interface LiveBlogStatusChangeEvent { blogId: string; status: LiveBlogStatus; } interface LiveBlogPinUpdateEvent { blogId: string; updateId: string; isPinned: boolean; } @Injectable() export class LiveBlogService implements OnModuleInit { private readonly logger = new Logger(LiveBlogService.name); private readonly sseClients = new Map(); constructor( @InjectRepository(LiveBlog) private readonly liveBlogRepository: Repository, @InjectRepository(LiveBlogUpdate) private readonly liveBlogUpdateRepository: Repository, private readonly eventEmitter: EventEmitter2, @Inject(forwardRef(() => StrapiService)) private readonly strapiService: StrapiService, ) {} onModuleInit() { this.eventEmitter.on('live-blog.update', (data: LiveBlogUpdateEvent) => { this.broadcastToClients(data.blogId, { type: 'update', data: data.update, }); }); this.eventEmitter.on( 'live-blog.status-change', (data: LiveBlogStatusChangeEvent) => { this.broadcastToClients(data.blogId, { type: 'status-change', data: { status: data.status }, }); }, ); this.eventEmitter.on( 'live-blog.pin-update', (data: LiveBlogPinUpdateEvent) => { this.broadcastToClients(data.blogId, { type: 'pin-update', data: { updateId: data.updateId, isPinned: data.isPinned }, }); }, ); } // Live Blog CRUD operations async create(dto: CreateLiveBlogDto): Promise { const liveBlog = this.liveBlogRepository.create({ ...dto, status: dto.status || LiveBlogStatus.DRAFT, }); return await this.liveBlogRepository.save(liveBlog); } async findAll( dto: FindLiveBlogsDto, ): Promise<{ data: LiveBlog[]; total: number }> { const { category, author, status, isPinned, search, page = 1, limit = 10, } = dto; const queryBuilder = this.liveBlogRepository .createQueryBuilder('liveBlog') .leftJoinAndSelect('liveBlog.author', 'author') .leftJoinAndSelect('liveBlog.category', 'category'); // Handle status filter - can be single value or comma-separated list if (status) { if (typeof status === 'string' && status.includes(',')) { const statuses = status.split(',').map((s) => s.trim()); queryBuilder.where('liveBlog.status IN (:...statuses)', { statuses }); } else { queryBuilder.where('liveBlog.status = :status', { status }); } } // If no status specified, return all live blogs (for admin dashboard) // Note: Pinned blogs query should still work without status filter if (category) { queryBuilder.andWhere('category.slug = :category', { category }); } if (author) { queryBuilder.andWhere('author.slug = :author', { author }); } if (search) { queryBuilder.andWhere( '(liveBlog.title ILIKE :search OR liveBlog.description ILIKE :search)', { search: `%${search}%` }, ); } if (isPinned !== undefined) { queryBuilder.andWhere('liveBlog.isPinned = :isPinned', { isPinned }); } const [data, total] = await queryBuilder .orderBy('liveBlog.createdAt', 'DESC') .skip((page - 1) * limit) .take(limit) .getManyAndCount(); return { data, total }; } async findOne(id: string): Promise { const liveBlog = await this.liveBlogRepository.findOne({ where: { id }, relations: ['author', 'category', 'updates'], }); if (!liveBlog) { throw new NotFoundException(`Live blog with ID ${id} not found`); } // Increment view count await this.liveBlogRepository.increment({ id }, 'viewCount', 1); return liveBlog; } async findOneWithoutIncrement(id: string): Promise { const liveBlog = await this.liveBlogRepository.findOne({ where: { id }, relations: ['author', 'category', 'updates'], }); if (!liveBlog) { throw new NotFoundException(`Live blog with ID ${id} not found`); } return liveBlog; } async findBySlug(slug: string): Promise { const liveBlog = await this.liveBlogRepository.findOne({ where: { slug }, relations: ['author', 'category', 'updates'], }); if (!liveBlog) { throw new NotFoundException(`Live blog with slug ${slug} not found`); } // Increment view count await this.liveBlogRepository.increment({ slug }, 'viewCount', 1); return liveBlog; } async update(id: string, dto: UpdateLiveBlogDto): Promise { const liveBlog = await this.liveBlogRepository.findOne({ where: { id }, relations: ['author', 'category'], }); if (!liveBlog) { throw new NotFoundException(`Live blog with ID ${id} not found`); } // Track if status changed for event emission const oldStatus = liveBlog.status; // Update fields if (dto.title !== undefined) { liveBlog.title = dto.title; } if (dto.slug !== undefined) { liveBlog.slug = dto.slug; } if (dto.description !== undefined) { liveBlog.description = dto.description; } if (dto.status !== undefined) { liveBlog.status = dto.status; } if (dto.strapiId !== undefined) { liveBlog.strapiId = dto.strapiId; } if (dto.authorId !== undefined) { liveBlog.authorId = dto.authorId; } if (dto.categoryId !== undefined) { liveBlog.categoryId = dto.categoryId; } if (dto.isPinned !== undefined) { liveBlog.isPinned = dto.isPinned; } // Save the updated entity const updatedBlog = await this.liveBlogRepository.save(liveBlog); // Emit status change event if status changed if (dto.status !== undefined && dto.status !== oldStatus) { this.eventEmitter.emit('live-blog.status-change', { blogId: id, status: dto.status, }); // Update Strapi if live blog has strapiId if (liveBlog.strapiId) { try { await this.strapiService.updateLiveBlogStatusInStrapi( liveBlog.strapiId, dto.status, ); } catch (error) { // Log error but don't fail - backend status is updated this.logger.error( `Failed to update Strapi status for live blog ${id}:`, error, ); } } } return updatedBlog; } async remove(id: string): Promise { const liveBlog = await this.findOne(id); // Delete from Strapi if live blog has strapiId if (liveBlog.strapiId) { try { await this.strapiService.deleteLiveBlogFromStrapi(liveBlog.strapiId); } catch (error) { // Log error but don't fail - we still want to delete from backend this.logger.error( `Failed to delete live blog ${id} from Strapi:`, error, ); } } await this.liveBlogRepository.remove(liveBlog); } async archive(id: string): Promise { const liveBlog = await this.findOne(id); liveBlog.status = LiveBlogStatus.ARCHIVED; const savedLiveBlog = await this.liveBlogRepository.save(liveBlog); // Update Strapi if live blog has strapiId if (liveBlog.strapiId) { try { await this.strapiService.updateLiveBlogStatusInStrapi( liveBlog.strapiId, LiveBlogStatus.ARCHIVED, ); } catch (error) { // Log error but don't fail - backend status is updated this.logger.error( `Failed to update Strapi status for live blog ${id}:`, error, ); } } return savedLiveBlog; } async publish( id: string, status: LiveBlogStatus = LiveBlogStatus.DRAFT, ): Promise { const liveBlog = await this.findOne(id); liveBlog.status = status; const savedLiveBlog = await this.liveBlogRepository.save(liveBlog); // Update Strapi if live blog has strapiId if (liveBlog.strapiId) { try { await this.strapiService.updateLiveBlogStatusInStrapi( liveBlog.strapiId, status, ); } catch (error) { // Log error but don't fail - backend status is updated this.logger.error( `Failed to update Strapi status for live blog ${id}:`, error, ); } } return savedLiveBlog; } // Live Blog Update CRUD operations async createUpdate( dto: CreateLiveBlogUpdateDto, liveBlogId: string, ): Promise { const liveBlogEntity = await this.findOne(liveBlogId); const update = this.liveBlogUpdateRepository.create({ ...dto, liveBlog: liveBlogEntity, }); const savedUpdate = await this.liveBlogUpdateRepository.save(update); // Emit update event this.eventEmitter.emit('live-blog.update', { blogId: liveBlogId, update: savedUpdate, }); return savedUpdate; } async findUpdates( liveBlogId: string, page = 1, limit = 50, ): Promise<{ data: LiveBlogUpdate[]; total: number }> { const [data, total] = await this.liveBlogUpdateRepository.findAndCount({ where: { liveBlog: { id: liveBlogId } }, relations: ['author'], order: { isPinned: 'DESC', createdAt: 'ASC', }, skip: (page - 1) * limit, take: limit, }); return { data, total }; } async updateUpdate( liveBlogId: string, updateId: string, dto: UpdateLiveBlogUpdateDto, ): Promise { const update = await this.liveBlogUpdateRepository.findOne({ where: { id: updateId, liveBlog: { id: liveBlogId } }, relations: ['liveBlog'], }); if (!update) { throw new NotFoundException(`Update with ID ${updateId} not found`); } Object.assign(update, dto); const savedUpdate = await this.liveBlogUpdateRepository.save(update); // Emit pin change event if (dto.isPinned !== undefined) { this.eventEmitter.emit('live-blog.pin-update', { blogId: liveBlogId, updateId, isPinned: dto.isPinned, }); } return savedUpdate; } async removeUpdate(liveBlogId: string, updateId: string): Promise { const update = await this.liveBlogUpdateRepository.findOne({ where: { id: updateId, liveBlog: { id: liveBlogId } }, }); if (!update) { throw new NotFoundException(`Update with ID ${updateId} not found`); } await this.liveBlogUpdateRepository.remove(update); } // SSE operations createStream(liveBlogId: string, response: Response): void { // Verify live blog exists and is live this.findOne(liveBlogId).catch(() => { response.end(); return; }); const clientId = `${Date.now()}-${Math.random()}`; // Set SSE headers response.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Cache-Control', }); // Send initial connection message response.write( `data: ${JSON.stringify({ type: 'connected', clientId })}\n\n`, ); // Store client connection this.sseClients.set(clientId, { id: clientId, response, blogId: liveBlogId, }); // Send periodic keep-alive messages to prevent timeout const keepAliveInterval = setInterval(() => { try { response.write(`: keep-alive\n\n`); } catch { // Client disconnected, stop sending keep-alive clearInterval(keepAliveInterval); } }, 15000); // Send keep-alive every 15 seconds // Handle client disconnect response.on('close', () => { clearInterval(keepAliveInterval); this.sseClients.delete(clientId); this.logger.log( `Client ${clientId} disconnected from live blog ${liveBlogId}`, ); }); this.logger.log(`Client ${clientId} connected to live blog ${liveBlogId}`); } private broadcastToClients(liveBlogId: string, message: any): void { const clients = Array.from(this.sseClients.values()).filter( (client) => client.blogId === liveBlogId, ); clients.forEach((client) => { try { client.response.write(`data: ${JSON.stringify(message)}\n\n`); } catch (error) { this.logger.error( `Failed to send message to client ${client.id}:`, error, ); this.sseClients.delete(client.id); } }); this.logger.debug( `Broadcasted message to ${clients.length} clients for live blog ${liveBlogId}`, ); } // Strapi sync operations async syncFromStrapi( strapiId: string, data: Partial, ): Promise { // Use upsert to handle race conditions and ensure uniqueness try { // First try to find existing live blog by strapiId const liveBlog = await this.liveBlogRepository.findOne({ where: { strapiId }, }); if (liveBlog) { // Update existing live blog const currentStatus = liveBlog.status; Object.assign(liveBlog, data); // Preserve archived status if live blog is already archived in our system // unless Strapi explicitly sends a different status if ( currentStatus === LiveBlogStatus.ARCHIVED && data.status !== LiveBlogStatus.ARCHIVED ) { liveBlog.status = LiveBlogStatus.ARCHIVED; } return await this.liveBlogRepository.save(liveBlog); } else { // Create new live blog const newLiveBlog = this.liveBlogRepository.create({ strapiId, ...data, status: data.status || LiveBlogStatus.DRAFT, }); return await this.liveBlogRepository.save(newLiveBlog); } } catch (error: unknown) { // If we get a unique constraint violation, try to find and update again // This handles race conditions where two syncs happen simultaneously const dbError = error as { code?: string; constraint?: string }; if ( dbError.code === '23505' && dbError.constraint?.includes('strapiId') ) { this.logger.warn( `Race condition detected for strapiId ${strapiId}, retrying...`, ); // Wait a bit and retry await new Promise((resolve) => setTimeout(resolve, 100)); const existingLiveBlog = await this.liveBlogRepository.findOne({ where: { strapiId }, }); if (existingLiveBlog) { const currentStatus = existingLiveBlog.status; Object.assign(existingLiveBlog, data); if ( currentStatus === LiveBlogStatus.ARCHIVED && data.status !== LiveBlogStatus.ARCHIVED ) { existingLiveBlog.status = LiveBlogStatus.ARCHIVED; } return await this.liveBlogRepository.save(existingLiveBlog); } } // Re-throw other errors throw error; } } async removeByStrapiId(strapiId: string): Promise { const liveBlog = await this.liveBlogRepository.findOne({ where: { strapiId }, }); if (!liveBlog) { this.logger.warn(`LiveBlog with strapiId ${strapiId} not found`); return; } await this.liveBlogRepository.remove(liveBlog); this.logger.log( `Successfully deleted live blog with strapiId: ${strapiId}`, ); } // Utility methods async getLiveBlogsWithRecentUpdates(hours = 24): Promise { const since = new Date(); since.setHours(since.getHours() - hours); return await this.liveBlogRepository .createQueryBuilder('liveBlog') .leftJoinAndSelect('liveBlog.author', 'author') .leftJoinAndSelect('liveBlog.updates', 'updates') .where('liveBlog.status = :status', { status: LiveBlogStatus.LIVE }) .andWhere('updates.createdAt > :since', { since }) .orderBy('updates.createdAt', 'DESC') .getMany(); } async findPinned(): Promise { return this.liveBlogRepository.find({ where: { isPinned: true, status: In([LiveBlogStatus.LIVE, LiveBlogStatus.ENDED]), // Show both live and ended pinned blogs }, relations: ['author', 'category', 'updates'], order: { updatedAt: 'DESC' }, take: 5, }); } async findActive(): Promise { return this.liveBlogRepository.find({ where: { status: LiveBlogStatus.LIVE }, relations: ['author', 'category'], order: { createdAt: 'DESC' }, take: 20, }); } }