Consumer Service
Overview
The Consumer service is the core processing engine that receives raw comments from Kafka, performs deduplication, calls sentiment analysis, stores results, and handles failures with retry logic.
Purpose
- Process raw comments from
raw-commentstopic - Eliminate duplicates using Redis and LRU caching
- Enrich comments with sentiment tags via gRPC
- Store processed comments in PostgreSQL
- Publish enriched data to
processed-commentstopic - Handle failures gracefully with retry and dead-letter queue
Architecture
┌─────────────────────────┐
│ Kafka: raw-comments │
└───────────┬─────────────┘
│
▼
┌─────────────────────────┐
│ Deduplication Check │
│ (Redis + LRU) │
└───────────┬─────────────┘
│
▼
┌─────────────────────────┐
│ Sentiment gRPC Call │
│ (with cache) │
└───────────┬─────────────┘
│
▼
┌─────────────────────────┐
│ Save to PostgreSQL │
└───────────┬─────────────┘
│
▼
┌─────────────────────────┐
│ Publish: processed- │
│ comments │
└─────────────────────────┘
│
▼ (on failure)
┌─────────────────────────┐
│ Retry Queue (5 max) │
│ → Dead Letter Queue │
└─────────────────────────┘Key Components
1. Comment Deduplication (Redis)
Purpose: Prevents reprocessing the same comment multiple times
Location: consumer/src/redis.service.ts
async isCommentProcessed(commentId: string): Promise<boolean> {
const key = `processed:${commentId}`
const exists = await this.client.exists(key)
return exists === 1
}
async markCommentAsProcessed(commentId: string): Promise<void> {
const key = `processed:${commentId}`
const ttl = process.env.REDIS_TTL || 10800 // 3 hours
await this.client.setex(key, ttl, '1')
}REDIS_TTL=10800 # 3 hours (10,800 seconds)
- Uses
commentId(not text) as deduplication key - Key format:
processed:{commentId} - If comment ID exists in Redis → skip processing
- If not found → mark as processed and continue
- 3-hour TTL balances memory usage vs duplicate window
- Same comment may be submitted multiple times (5% in producer)
- Text hash would allow duplicate comments with different IDs
- commentId is the source system's unique identifier
1.5. Sentiment Result Caching (LRU)
Purpose: Avoids redundant sentiment analysis for identical text
Location: consumer/src/cache.service.ts
hashText(text: string): string {
return createHash('sha256')
.update(text.toLowerCase().trim())
.digest('hex')
}
getCachedSentiment(textHash: string): { tag: string, timestamp: number } | undefined {
return this.textHashCache.get(textHash)
}
setCachedSentiment(textHash: string, tag: string): void {
this.textHashCache.set(textHash, { tag, timestamp: Date.now() })
}const textHashCache = new LRUCache({
max: 100, // CONSUMER_CACHE_SIZE
ttl: 1000 * 60 * 60 // 1 hour
})- Hash comment text (SHA256 of lowercase, trimmed)
- Check cache for sentiment result
- If hit → return cached tag, skip gRPC call
- If miss → call sentiment service, cache result
- Different comments can have identical text
- Caching sentiment results saves gRPC calls
- LRU evicts least-recently-used entries when full
1. Check Redis: Is this commentId processed?
└─ Yes → Skip (duplicate comment)
└─ No → Continue to step 2
2. Hash text, check LRU cache: Do we have sentiment for this text?
└─ Yes → Use cached tag, skip gRPC
└─ No → Call sentiment service, cache result2. Sentiment Client Service
Location: consumer/src/sentiment-client.service.ts
async onModuleInit() {
try {
const { consumerId } = await this.grpcService.RegisterConsumer({
serviceName: 'consumer-service'
})
this.consumerId = consumerId
this.logger.log(`Registered with sentiment service: ${consumerId}`)
} catch (error) {
this.logger.warn('Failed to register, will use unauthenticated mode')
this.consumerId = 'unregistered'
}
}async analyzeSentiment(text: string): Promise<SentimentTag> {
const response = await this.grpcService.AnalyzeSentiment({
consumerId: this.consumerId,
text: text,
timestamp: new Date().toISOString()
})
return response.tag // positive | negative | neutral | unrelated
}- Registered (has consumerId): 100 requests/second
- Unregistered: 10 requests/second
2.5. Consumer Registration
Purpose: Authenticate with sentiment service for higher rate limits
Location: consumer/src/sentiment-client.service.ts
async onModuleInit() {
try {
const { consumerId } = await this.grpcService.RegisterConsumer({
serviceName: 'consumer-service'
})
this.consumerId = consumerId
this.logger.log(`Registered with sentiment service: ${consumerId}`)
} catch (error) {
this.logger.warn('Failed to register, will use unauthenticated mode')
this.consumerId = 'unregistered'
}
}- On service startup, consumer registers with sentiment service
- Receives unique
consumerId(UUID format) - Includes
consumerIdin all gRPC sentiment analysis requests - Falls back to "unregistered" if registration fails
- Registered consumers: 100 requests/second
- Unregistered: 10 requests/second
consumerIdis stored with each processed comment- Enables tracking which consumer instance processed each comment
- Useful for debugging and monitoring distributed consumer instances
3. Retry Mechanism
Configuration:CONSUMER_MAX_RETRIES=5
CONSUMER_RETRY_DELAY=1000 # Base delay in ms
Initial attempt: Immediate (no delay)
Retry attempt 1: 1s delay (1000ms * 2^0)
Retry attempt 2: 2s delay (1000ms * 2^1)
Retry attempt 3: 4s delay (1000ms * 2^2)
Retry attempt 4: 8s delay (1000ms * 2^3)
Retry attempt 5: 16s delay (1000ms * 2^4)
After max retries: → Dead Letter QueueFormula: baseDelay * 2^(attempt - 1) where baseDelay = 1000ms
async processWithRetry(comment: RawComment, retryCount = 0): Promise<void> {
try {
const tag = await this.sentimentClient.analyzeSentiment(comment.text)
await this.saveToDatabase({ ...comment, tag, retryCount })
await this.publishProcessed({ ...comment, tag })
} catch (error) {
if (retryCount < MAX_RETRIES) {
const delay = RETRY_DELAY * Math.pow(2, retryCount)
await sleep(delay)
return this.processWithRetry(comment, retryCount + 1)
} else {
await this.publishToDeadLetterQueue(comment, error)
}
}
}- Gives service time to recover
- Prevents overwhelming failed service
- Industry standard pattern
4. Database Storage
Entity: consumer/src/entities/processed-comment.entity.ts
@Entity('processed_comments')
export class ProcessedComment {
@PrimaryGeneratedColumn()
id: number
@Column({ unique: true })
commentId: string
@Column()
text: string
@Column({ type: 'varchar', length: 64 })
textHash: string // SHA256 hash for sentiment caching
@Column({ type: 'enum', enum: ['positive', 'negative', 'neutral', 'unrelated'] })
tag: string
@Column()
source: string
@Column({ type: 'timestamp' })
processedAt: Date
@Column()
consumerId: string // Which consumer instance processed it
@Column({ default: 0 })
retryCount: number
}Indexes: commentId (unique), textHash, tag, processedAt
Key fields:textHash: SHA256 hash used for sentiment result cachingconsumerId: Tracks which consumer instance processed the commentretryCount: Number of retry attempts before successful processing
- commentId: Fast duplicate checks and ensures no reprocessing
- textHash: Quick sentiment cache lookups
- tag: Dashboard filters by sentiment
- processedAt: "Last hour" statistics and time-based queries
5. Kafka Topics
Consumes from:raw-comments- New comments from producerretry-queue- Failed comments to retry
processed-comments- Successfully processed commentsdead-letter-queue- Failed after max retries
Behavior Details
Message Processing Flow
Step-by-step:-
Receive message from
raw-comments{ "commentId": "abc-123", "text": "Great food!", "source": "twitter", "timestamp": "2026-03-26T10:30:00Z" } -
Check if duplicate
- Hash text:
sha256("Great food!")→3a2f1b... - Check LRU cache for hash
- Check Redis for hash
- If found: Log "Duplicate detected" and skip
- Hash text:
-
Call Sentiment service
const tag = await sentimentClient.analyzeSentiment("Great food!") // Returns: "positive" -
Save to database
INSERT INTO comments (commentId, text, tag, source, processedAt, retryCount) VALUES ('abc-123', 'Great food!', 'positive', 'twitter', NOW(), 0) -
Publish to processed-comments
{ "commentId": "abc-123", "text": "Great food!", "tag": "positive", "source": "twitter", "processedAt": "2026-03-26T10:30:05Z", "retryCount": 0 }
Failure Handling
Scenario 1: Sentiment service timeoutAttempt 1: gRPC call fails
Wait 1s
Attempt 2: gRPC call fails
Wait 2s
Attempt 3: gRPC call fails
Wait 4s
Attempt 4: gRPC call fails
Wait 8s
Attempt 5: gRPC call fails
→ Publish to dead-letter-queueAttempt 1: Database save fails
Wait 1s
Attempt 2: Database save succeeds
→ Continue to publish processed-comments{
"originalMessage": { /* raw comment */ },
"error": "gRPC call timeout after 5s",
"retryCount": 5,
"failedAt": "2026-03-26T10:35:00Z"
}Standalone Mode
Bootstrap: consumer/src/main.ts
async function bootstrap() {
const app = await NestJS.createMicroservice<MicroserviceOptions>({
transport: Transport.KAFKA,
options: {
client: {
brokers: [KAFKA_BROKER]
},
consumer: {
groupId: 'restaurant-comments-consumer'
}
}
})
await app.listen()
// No HTTP server
}- Pure Kafka consumer
- No HTTP endpoints needed
- Lightweight
Performance Characteristics
Processing Rate:- Without duplicates: ~50-100 comments/second
- With duplicates: ~200+ comments/second (cached lookups)
- Deduplication: 1-5ms (LRU) or 10-20ms (Redis)
- gRPC call: 50-200ms (depends on text length)
- Database save: 10-30ms
- Kafka publish: 5-10ms
Total: ~75-265ms per comment
Monitoring
Logs to watch:[Consumer] Processing comment: abc-123
[Consumer] Duplicate detected: def-456 (skipped)
[Consumer] Sentiment analysis: positive (took 125ms)
[Consumer] Saved to database: abc-123
[Consumer] Published to processed-comments: abc-123
[Consumer] Retry attempt 3/5: ghi-789
[Consumer] Moved to DLQ: jkl-012 (max retries exceeded)- Processed count
- Duplicate count
- Retry count
- DLQ count
- Average processing time
- Cache hit rate
Development
Run locally:cd consumer
pnpm devcd consumer
pnpm run migration:runcd consumer
pnpm run migration:create AddIndexToCommentsTesting
Manual Test - Send Comment
# Publish to raw-comments
docker exec -it kafka-broker kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic raw-comments
# Then type JSON:
{"commentId":"test-123","text":"Test comment","source":"twitter","timestamp":"2026-03-26T10:00:00Z"}Check Processing
# View consumer logs
docker logs -f kafka-consumer
# Query database
docker exec kafka-postgres psql -U postgres -d restaurant_comments \
-c "SELECT * FROM comments ORDER BY processedAt DESC LIMIT 5"
# Check processed-comments topic
docker exec -it kafka-broker kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic processed-comments \
--from-beginningNext Steps
- Sentiment Service - How sentiment analysis works
- API & SSE - How data reaches the dashboard
- Getting Started - Full data flow