Skip to content

API & SSE Service

Overview

The API service is a hybrid NestJS application that acts as both an HTTP server (REST endpoints) and a Kafka consumer. It consumes processed comments from Kafka, reads existing comments from PostgreSQL (populated by consumer service), serves them via REST API, and broadcasts new comments in real-time using Server-Sent Events (SSE).

Purpose

  • Provide REST endpoints for dashboard data fetching
  • Stream real-time comment updates via Server-Sent Events
  • Consume processed-comments from Kafka
  • Calculate aggregate statistics (for REST endpoint)
  • Bridge between backend processing and frontend UI

Architecture

┌──────────────────────┐
│  Kafka Consumer      │
│  (processed-comments)│
└──────────┬───────────┘


┌──────────────────────┐
│  Kafka Consumer      │
│  Service             │
└──────────┬───────────┘


     ┌─────────┐
     │   SSE   │
     │ Service │
     └────┬────┘


   ┌────────────┐
   │ Subject    │
   │ (RxJS)     │
   └────┬───────┘


┌─────────────────────┐       ┌──────────────┐
│  HTTP Endpoints     │◄──────┤  PostgreSQL  │
│  - GET /api/comments│ reads │  (populated  │
│  - GET /api/stats   │       │  by consumer)│
│  - GET /api/sse/... │       └──────────────┘
│  - GET /health      │
└─────────────────────┘

Key Components

1. Kafka Consumer Service

Location: api/src/kafka-consumer.service.ts

Purpose: Listens to processed-comments topic and streams to SSE clients

Implementation:
@Controller()
export class KafkaConsumerService {
  @EventPattern(KAFKA_TOPICS.PROCESSED_COMMENTS)
  async handleProcessedComment(data: ProcessedComment) {
    // Comment is already saved by consumer service
    // API only needs to broadcast via SSE for real-time updates
    this.sseService.emitComment(data)
  }
}
Why Kafka consumer in API:
  • Receives processed comments for real-time streaming
  • Comments already persisted by consumer service
  • Decouples API from consumer processing
  • Multiple API instances can stream same topic
  • Event-driven updates without database polling

Note: API does not re-save comments to database. The consumer service already persisted them. API's role is purely streaming via SSE for real-time dashboard updates

2. Comments Service

Location: api/src/comments.service.ts

Query operations:
@Injectable()
export class CommentsService {
 
  async findAll(
    page: number = 1,
    pageSize: number = 20,
    tag?: CommentTag,
    search?: string,
  ): Promise<PaginatedResponse<ProcessedComment>> {
    const skip = (page - 1) * pageSize
 
    const where: FindOptionsWhere<ProcessedComment> = {}
 
    if (tag) {
      where.tag = tag
    }
 
    if (search) {
      where.text = ILike(`%${search}%`)
    }
 
    const [data, total] = await this.commentRepository.findAndCount({
      where,
      skip,
      take: pageSize,
      order: {
        processedAt: 'DESC',
      },
    })
 
    return {
      data,
      total,
      page,
      pageSize,
      totalPages: Math.ceil(total / pageSize),
    }
  }
 
  async getStatistics(): Promise<CommentStatistics> {
    const total = await this.commentRepository.count()
 
    // Get counts by tag
    const tagCounts = await this.commentRepository
      .createQueryBuilder('comment')
      .select('comment.tag', 'tag')
      .addSelect('COUNT(*)', 'count')
      .groupBy('comment.tag')
      .getRawMany()
 
    const byTag = {
      [CommentTag.POSITIVE]: 0,
      [CommentTag.NEGATIVE]: 0,
      [CommentTag.NEUTRAL]: 0,
      [CommentTag.UNRELATED]: 0,
    }
 
    tagCounts.forEach((row) => {
      byTag[row.tag as CommentTag] = parseInt(row.count)
    })
 
    // Get recent count (last hour)
    const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000)
    const recentCount = await this.commentRepository.count({
      where: {
        processedAt: new Date(oneHourAgo.getTime()),
      },
    })
 
    return {
      total,
      byTag,
      recentCount,
    }
  }
}

3. SSE Service

Location: api/src/sse.service.ts

Purpose: Manages Server-Sent Events broadcasting to connected clients

Implementation using RxJS:
interface SseEvent {
  data: any
  type?: string
}
 
@Injectable()
export class SseService {
  private readonly commentSubject = new Subject<ProcessedComment>()
  private readonly commentStream$: Observable<SseEvent>
 
  constructor() {
    this.commentStream$ = this.commentSubject.asObservable().pipe(
      map((comment) => ({
        data: comment,
        type: 'comment',
      })),
      share()
    )
  }
 
  getCommentStream(): Observable<SseEvent> {
    return this.commentStream$
  }
 
  emitComment(comment: ProcessedComment): void {
    this.commentSubject.next(comment)
  }
}
Why RxJS Subject:
  • Multiple clients can subscribe
  • Broadcast to all connections
  • Automatic cleanup when connections close
  • Reactive programming pattern

4. HTTP Controllers

Comments Controller

Location: api/src/comments.controller.ts

Endpoint: GET /api/comments

Query Parameters:
  • page (default: 1)
  • pageSize (default: 20)
  • tag (optional: positive | negative | neutral | unrelated)
  • search (optional: text search)
Example request:
GET /api/comments?page=2&pageSize=10&tag=positive&search=food
Response:
{
  "data": [
    {
      "id": 123,
      "commentId": "abc-123",
      "text": "Amazing food!",
      "textHash": "3a2f1b4e...",
      "tag": "positive",
      "source": "instagram",
      "processedAt": "2026-03-26T10:30:00.000Z",
      "consumerId": "consumer-1234567890",
      "retryCount": 0
    }
  ],
  "total": 156,
  "page": 2,
  "pageSize": 10,
  "totalPages": 16
}

Statistics Controller

Location: api/src/statistics.controller.ts

Endpoint: GET /api/statistics

Response:
{
  "total": 1523,
  "byTag": {
    "positive": 456,
    "negative": 123,
    "neutral": 789,
    "unrelated": 155
  },
  "recentCount": 234
}

SSE Controller

Location: api/src/sse.controller.ts

Endpoint: GET /api/sse/comments

Response: Event stream (text/event-stream)

event: comment
data: {"commentId":"abc-123","text":"Great!","tag":"positive",...}
 
event: comment
data: {"commentId":"def-456","text":"Terrible!","tag":"negative",...}
Implementation:
@Controller('api/sse')
export class SseController {
  @Sse('comments')
  streamComments(): Observable<any> {
    // Heartbeat to keep connection alive
    const heartbeat$ = interval(30000).pipe(
      map(() => ({ data: { type: 'heartbeat' }, type: 'heartbeat' }))
    )
    
    // Merge comment stream with heartbeat
    return merge(
      this.sseService.getCommentStream(),
      heartbeat$
    )
  }
}
SSE Format:
  • event: comment - Comment event type
  • data: <json> - JSON payload
  • Empty line - Message separator

5. Health Endpoint

Endpoint: GET /health

Response:
{
  "status": "healthy",
  "service": "api",
  "statistics": {
    "total": 1523,
    "byTag": {
      "positive": 456,
      "negative": 123,
      "neutral": 789,
      "unrelated": 155
    },
    "recentCount": 234
  },
  "timestamp": "2026-03-26T10:30:00.000Z"
}

Behavior Details

SSE Connection Flow

Client (Dashboard) side:
const eventSource = new EventSource('http://localhost:3001/api/sse/comments')
 
eventSource.addEventListener('comment', (event) => {
  const comment = JSON.parse(event.data)
  // Validate with schema and insert into TanStack DB collection
})
 
eventSource.onerror = () => {
  // Browser automatically reconnects
}
Server side:
// When comment processed:
this.sseService.emitComment(comment)
// → Sent to all connected clients

Message Flow

Complete flow from Kafka to Dashboard:
  1. Consumer publishes to processed-comments topic:

    {"commentId":"abc","text":"Great!","tag":"positive",...}
  2. API's Kafka consumer receives message

  3. API emits SSE event:

    event: comment
    data: {"commentId":"abc",...}
  4. All connected browsers receive event

  5. Dashboard updates UI in real-time

Note: API does not save to database. The consumer service already persisted the comment. API only streams via SSE for real-time updates.

CORS Configuration

Enabled for dashboard origin:
app.enableCors({
  origin: process.env.API_CORS_ORIGIN || 'http://localhost:3000',
  credentials: true,
})
Why CORS:
  • Dashboard runs on different port (3000)
  • API runs on port 3001
  • Browser requires CORS headers

Hybrid Application

Bootstrap: api/src/main.ts

async function bootstrap() {
  const app = await NestFactory.create(AppModule)
  
  // Enable CORS for dashboard
  app.enableCors({
    origin: process.env.API_CORS_ORIGIN || 'http://localhost:3000',
    credentials: true,
  })
  
  // Add Kafka microservice
  app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.KAFKA,
    options: {
      client: {
        clientId: process.env.KAFKA_CLIENT_ID || 'api',
        brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
      },
      consumer: {
        groupId: KAFKA_CONSUMER_GROUPS.API,
        sessionTimeout: 30000,
        heartbeatInterval: 3000,
      },
      subscribe: {
        fromBeginning: false, // Only new messages for API service
      },
    },
  })
  
  await app.startAllMicroservices()
  
  const port = process.env.API_PORT || 3001
  await app.listen(port)
}
Why hybrid:
  • Single service handles both concerns
  • Shared database connection
  • Simpler deployment
  • Direct coordination between Kafka and HTTP

Performance Characteristics

REST Endpoints:
  • /api/comments: 20-50ms (database query)
  • /api/statistics: 10-30ms (cached or aggregated)
SSE:
  • Connection overhead: < 10ms
  • Event delivery: < 5ms
  • Concurrent connections: 100+ supported
Kafka consumption:
  • Lag: Typically 0-10 messages
  • Processing: 5-20ms per message

Monitoring

Logs:
[API] Kafka consumer microservice started
[API] Subscribed to topic: processed-comments
[API] API service is running on: http://localhost:3001
[API] Received processed comment: abc-123... [positive]
[API] New SSE client connected
[API] Emitted SSE event for comment: abc-123...
Metrics:
  • Total comments in database
  • SSE connections active
  • Kafka consumer lag
  • REST endpoint latencies
  • Events emitted per second

Development

Run locally:
cd api
pnpm dev
Environment variables:
API_PORT=3001
API_CORS_ORIGIN=http://localhost:3000
KAFKA_BROKER=localhost:9092
POSTGRES_HOST=localhost

Testing

Test REST Endpoints

# Get comments
curl http://localhost:3001/api/comments
 
# Get with filters
curl "http://localhost:3001/api/comments?tag=positive&page=1&pageSize=5"
 
# Get statistics
curl http://localhost:3001/api/statistics
 
# Health check
curl http://localhost:3001/health

Test SSE

# Stream events (use curl or browser)
curl -N http://localhost:3001/api/sse/comments
 
# Should see events as they arrive:
# event: comment
# data: {...}

Verify Kafka Integration

# Publish test message to processed-comments
docker exec -it kafka-broker kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic processed-comments
 
# Type JSON (matching what consumer service publishes):
{"id":1,"commentId":"test","text":"Test","textHash":"abc123","tag":"positive","source":"twitter","processedAt":"2026-03-26T10:00:00Z","consumerId":"test-consumer","retryCount":0}
 
# Check API logs - should see "Received processed comment: test..."
# Check SSE stream - should emit event to connected clients

Next Steps