API & SSE Service
Overview
The API service is a hybrid NestJS application that acts as both an HTTP server (REST endpoints) and a Kafka consumer. It consumes processed comments from Kafka, reads existing comments from PostgreSQL (populated by consumer service), serves them via REST API, and broadcasts new comments in real-time using Server-Sent Events (SSE).
Purpose
- Provide REST endpoints for dashboard data fetching
- Stream real-time comment updates via Server-Sent Events
- Consume
processed-commentsfrom Kafka - Calculate aggregate statistics (for REST endpoint)
- Bridge between backend processing and frontend UI
Architecture
┌──────────────────────┐
│ Kafka Consumer │
│ (processed-comments)│
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ Kafka Consumer │
│ Service │
└──────────┬───────────┘
│
▼
┌─────────┐
│ SSE │
│ Service │
└────┬────┘
│
▼
┌────────────┐
│ Subject │
│ (RxJS) │
└────┬───────┘
│
▼
┌─────────────────────┐ ┌──────────────┐
│ HTTP Endpoints │◄──────┤ PostgreSQL │
│ - GET /api/comments│ reads │ (populated │
│ - GET /api/stats │ │ by consumer)│
│ - GET /api/sse/... │ └──────────────┘
│ - GET /health │
└─────────────────────┘Key Components
1. Kafka Consumer Service
Location: api/src/kafka-consumer.service.ts
Purpose: Listens to processed-comments topic and streams to SSE clients
@Controller()
export class KafkaConsumerService {
@EventPattern(KAFKA_TOPICS.PROCESSED_COMMENTS)
async handleProcessedComment(data: ProcessedComment) {
// Comment is already saved by consumer service
// API only needs to broadcast via SSE for real-time updates
this.sseService.emitComment(data)
}
}- Receives processed comments for real-time streaming
- Comments already persisted by consumer service
- Decouples API from consumer processing
- Multiple API instances can stream same topic
- Event-driven updates without database polling
Note: API does not re-save comments to database. The consumer service already persisted them. API's role is purely streaming via SSE for real-time dashboard updates
2. Comments Service
Location: api/src/comments.service.ts
@Injectable()
export class CommentsService {
async findAll(
page: number = 1,
pageSize: number = 20,
tag?: CommentTag,
search?: string,
): Promise<PaginatedResponse<ProcessedComment>> {
const skip = (page - 1) * pageSize
const where: FindOptionsWhere<ProcessedComment> = {}
if (tag) {
where.tag = tag
}
if (search) {
where.text = ILike(`%${search}%`)
}
const [data, total] = await this.commentRepository.findAndCount({
where,
skip,
take: pageSize,
order: {
processedAt: 'DESC',
},
})
return {
data,
total,
page,
pageSize,
totalPages: Math.ceil(total / pageSize),
}
}
async getStatistics(): Promise<CommentStatistics> {
const total = await this.commentRepository.count()
// Get counts by tag
const tagCounts = await this.commentRepository
.createQueryBuilder('comment')
.select('comment.tag', 'tag')
.addSelect('COUNT(*)', 'count')
.groupBy('comment.tag')
.getRawMany()
const byTag = {
[CommentTag.POSITIVE]: 0,
[CommentTag.NEGATIVE]: 0,
[CommentTag.NEUTRAL]: 0,
[CommentTag.UNRELATED]: 0,
}
tagCounts.forEach((row) => {
byTag[row.tag as CommentTag] = parseInt(row.count)
})
// Get recent count (last hour)
const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000)
const recentCount = await this.commentRepository.count({
where: {
processedAt: new Date(oneHourAgo.getTime()),
},
})
return {
total,
byTag,
recentCount,
}
}
}3. SSE Service
Location: api/src/sse.service.ts
Purpose: Manages Server-Sent Events broadcasting to connected clients
Implementation using RxJS:interface SseEvent {
data: any
type?: string
}
@Injectable()
export class SseService {
private readonly commentSubject = new Subject<ProcessedComment>()
private readonly commentStream$: Observable<SseEvent>
constructor() {
this.commentStream$ = this.commentSubject.asObservable().pipe(
map((comment) => ({
data: comment,
type: 'comment',
})),
share()
)
}
getCommentStream(): Observable<SseEvent> {
return this.commentStream$
}
emitComment(comment: ProcessedComment): void {
this.commentSubject.next(comment)
}
}- Multiple clients can subscribe
- Broadcast to all connections
- Automatic cleanup when connections close
- Reactive programming pattern
4. HTTP Controllers
Comments Controller
Location: api/src/comments.controller.ts
Endpoint: GET /api/comments
page(default: 1)pageSize(default: 20)tag(optional: positive | negative | neutral | unrelated)search(optional: text search)
GET /api/comments?page=2&pageSize=10&tag=positive&search=food{
"data": [
{
"id": 123,
"commentId": "abc-123",
"text": "Amazing food!",
"textHash": "3a2f1b4e...",
"tag": "positive",
"source": "instagram",
"processedAt": "2026-03-26T10:30:00.000Z",
"consumerId": "consumer-1234567890",
"retryCount": 0
}
],
"total": 156,
"page": 2,
"pageSize": 10,
"totalPages": 16
}Statistics Controller
Location: api/src/statistics.controller.ts
Endpoint: GET /api/statistics
{
"total": 1523,
"byTag": {
"positive": 456,
"negative": 123,
"neutral": 789,
"unrelated": 155
},
"recentCount": 234
}SSE Controller
Location: api/src/sse.controller.ts
Endpoint: GET /api/sse/comments
Response: Event stream (text/event-stream)
event: comment
data: {"commentId":"abc-123","text":"Great!","tag":"positive",...}
event: comment
data: {"commentId":"def-456","text":"Terrible!","tag":"negative",...}@Controller('api/sse')
export class SseController {
@Sse('comments')
streamComments(): Observable<any> {
// Heartbeat to keep connection alive
const heartbeat$ = interval(30000).pipe(
map(() => ({ data: { type: 'heartbeat' }, type: 'heartbeat' }))
)
// Merge comment stream with heartbeat
return merge(
this.sseService.getCommentStream(),
heartbeat$
)
}
}event: comment- Comment event typedata: <json>- JSON payload- Empty line - Message separator
5. Health Endpoint
Endpoint: GET /health
{
"status": "healthy",
"service": "api",
"statistics": {
"total": 1523,
"byTag": {
"positive": 456,
"negative": 123,
"neutral": 789,
"unrelated": 155
},
"recentCount": 234
},
"timestamp": "2026-03-26T10:30:00.000Z"
}Behavior Details
SSE Connection Flow
Client (Dashboard) side:const eventSource = new EventSource('http://localhost:3001/api/sse/comments')
eventSource.addEventListener('comment', (event) => {
const comment = JSON.parse(event.data)
// Validate with schema and insert into TanStack DB collection
})
eventSource.onerror = () => {
// Browser automatically reconnects
}// When comment processed:
this.sseService.emitComment(comment)
// → Sent to all connected clientsMessage Flow
Complete flow from Kafka to Dashboard:-
Consumer publishes to
processed-commentstopic:{"commentId":"abc","text":"Great!","tag":"positive",...} -
API's Kafka consumer receives message
-
API emits SSE event:
event: comment data: {"commentId":"abc",...} -
All connected browsers receive event
-
Dashboard updates UI in real-time
Note: API does not save to database. The consumer service already persisted the comment. API only streams via SSE for real-time updates.
CORS Configuration
Enabled for dashboard origin:app.enableCors({
origin: process.env.API_CORS_ORIGIN || 'http://localhost:3000',
credentials: true,
})- Dashboard runs on different port (3000)
- API runs on port 3001
- Browser requires CORS headers
Hybrid Application
Bootstrap: api/src/main.ts
async function bootstrap() {
const app = await NestFactory.create(AppModule)
// Enable CORS for dashboard
app.enableCors({
origin: process.env.API_CORS_ORIGIN || 'http://localhost:3000',
credentials: true,
})
// Add Kafka microservice
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.KAFKA,
options: {
client: {
clientId: process.env.KAFKA_CLIENT_ID || 'api',
brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
},
consumer: {
groupId: KAFKA_CONSUMER_GROUPS.API,
sessionTimeout: 30000,
heartbeatInterval: 3000,
},
subscribe: {
fromBeginning: false, // Only new messages for API service
},
},
})
await app.startAllMicroservices()
const port = process.env.API_PORT || 3001
await app.listen(port)
}- Single service handles both concerns
- Shared database connection
- Simpler deployment
- Direct coordination between Kafka and HTTP
Performance Characteristics
REST Endpoints:/api/comments: 20-50ms (database query)/api/statistics: 10-30ms (cached or aggregated)
- Connection overhead: < 10ms
- Event delivery: < 5ms
- Concurrent connections: 100+ supported
- Lag: Typically 0-10 messages
- Processing: 5-20ms per message
Monitoring
Logs:[API] Kafka consumer microservice started
[API] Subscribed to topic: processed-comments
[API] API service is running on: http://localhost:3001
[API] Received processed comment: abc-123... [positive]
[API] New SSE client connected
[API] Emitted SSE event for comment: abc-123...- Total comments in database
- SSE connections active
- Kafka consumer lag
- REST endpoint latencies
- Events emitted per second
Development
Run locally:cd api
pnpm devAPI_PORT=3001
API_CORS_ORIGIN=http://localhost:3000
KAFKA_BROKER=localhost:9092
POSTGRES_HOST=localhost
Testing
Test REST Endpoints
# Get comments
curl http://localhost:3001/api/comments
# Get with filters
curl "http://localhost:3001/api/comments?tag=positive&page=1&pageSize=5"
# Get statistics
curl http://localhost:3001/api/statistics
# Health check
curl http://localhost:3001/healthTest SSE
# Stream events (use curl or browser)
curl -N http://localhost:3001/api/sse/comments
# Should see events as they arrive:
# event: comment
# data: {...}Verify Kafka Integration
# Publish test message to processed-comments
docker exec -it kafka-broker kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic processed-comments
# Type JSON (matching what consumer service publishes):
{"id":1,"commentId":"test","text":"Test","textHash":"abc123","tag":"positive","source":"twitter","processedAt":"2026-03-26T10:00:00Z","consumerId":"test-consumer","retryCount":0}
# Check API logs - should see "Received processed comment: test..."
# Check SSE stream - should emit event to connected clientsNext Steps
- Frontend - How dashboard consumes the API
- Consumer Service - How comments are processed
- Getting Started - Full data flow