Consumer Service
Overview
The Consumer service is the core processing engine that receives raw comments from Kafka, performs deduplication, calls sentiment analysis, stores results, and handles failures with retry logic.
Purpose
- Process raw comments from
raw-commentstopic - Eliminate duplicates using Redis (by commentId)
- Cache sentiment results using LRU (by textHash)
- Enrich comments with sentiment tags via gRPC
- Store processed comments in PostgreSQL
- Publish enriched data to
processed-commentstopic - Handle failures gracefully with retry and dead-letter queue
Architecture
┌─────────────────────────┐
│ Kafka: raw-comments │
└───────────┬─────────────┘
│
▼
┌─────────────────────────┐
│ Deduplication Check │
│ (Redis by commentId) │
└───────────┬─────────────┘
│
▼
┌─────────────────────────┐
│ Sentiment gRPC Call │
│ (LRU cache by textHash)│
└───────────┬─────────────┘
│
▼
┌─────────────────────────┐
│ Save to PostgreSQL │
└───────────┬─────────────┘
│
▼
┌─────────────────────────┐
│ Publish: processed- │
│ comments │
└─────────────────────────┘
│
▼ (on failure)
┌─────────────────────────┐
│ Retry Queue (5 max) │
│ → Dead Letter Queue │
└─────────────────────────┘Key Components
1. Comment Deduplication (Redis)
Purpose: Prevents reprocessing the same comment multiple times
Location: consumer/src/redis.service.ts
async isCommentProcessed(commentId: string): Promise<boolean> {
const key = REDIS_KEYS.PROCESSED_COMMENT(commentId)
const exists = await this.client.exists(key)
return exists === 1
}
async markCommentAsProcessed(commentId: string): Promise<void> {
const key = REDIS_KEYS.PROCESSED_COMMENT(commentId)
const ttl = parseInt(process.env.REDIS_TTL || String(CACHE_CONFIG.REDIS_TTL_SECONDS))
await this.client.setex(key, ttl, '1')
}REDIS_TTL=10800 # 3 hours (default from CACHE_CONFIG.REDIS_TTL_SECONDS)
- Uses
commentIdas deduplication key - Key format:
processed:{commentId} - If comment ID exists in Redis → skip processing
- If not found → mark as processed and continue
- 3-hour TTL balances memory usage vs duplicate window
- commentId is the unique identifier from the source system
- Text hash would cause different comments with identical text to be incorrectly deduplicated
- Redis tracks which specific comments have been processed
1.5. Sentiment Result Caching (LRU)
Purpose: Avoids redundant sentiment analysis for identical text
Location: consumer/src/cache.service.ts
hashText(text: string): string {
return createHash('sha256')
.update(text.toLowerCase().trim())
.digest('hex')
}
getCachedSentiment(textHash: string): { tag: string, timestamp: number } | undefined {
return this.textHashCache.get(textHash)
}
setCachedSentiment(textHash: string, tag: string): void {
this.textHashCache.set(textHash, { tag, timestamp: Date.now() })
}const textHashCache = new LRUCache({
max: 100, // CONSUMER_CACHE_SIZE
ttl: 1000 * 60 * 60 // 1 hour
})- Hash comment text (SHA256 of lowercase, trimmed)
- Check cache for sentiment result
- If hit → return cached tag, skip gRPC call
- If miss → call sentiment service, cache result
- Different comments can have identical text
- Caching sentiment results saves gRPC calls
- LRU evicts least-recently-used entries when full
1. Check Redis: Is this commentId processed?
└─ Yes → Skip (duplicate comment)
└─ No → Continue to step 2
2. Hash text, check LRU cache: Do we have sentiment for this text?
└─ Yes → Use cached tag, skip gRPC
└─ No → Call sentiment service, cache result2. Sentiment Client Service
Location: consumer/src/sentiment-client.service.ts
async onModuleInit() {
try {
const { consumerId } = await this.grpcService.RegisterConsumer({
consumerName: 'consumer-service'
})
this.consumerId = consumerId
this.logger.log(`Registered with sentiment service: ${consumerId}`)
} catch (error) {
this.logger.warn('Failed to register, will use unauthenticated mode')
this.consumerId = 'unregistered'
}
}async analyzeSentiment(request: SentimentRequest): Promise<SentimentResponse> {
const enrichedRequest = {
...request,
consumerId: this.consumerId,
}
return new Promise((resolve, reject) => {
this.client.AnalyzeSentiment(enrichedRequest, (error, response) => {
if (error) reject(error)
else resolve(response)
})
})
}commentId: Unique comment identifiertext: Comment text to analyzetextHash: SHA256 hash of normalized textconsumerId: Consumer's registered ID (auto-added)
- Registered (has consumerId): 100 requests/second
- Unregistered: 10 requests/second
2.5. Consumer Registration
Purpose: Authenticate with sentiment service for higher rate limits
Location: consumer/src/sentiment-client.service.ts
async onModuleInit() {
try {
const { consumerId } = await this.grpcService.RegisterConsumer({
consumerName: 'consumer-service'
})
this.consumerId = consumerId
this.logger.log(`Registered with sentiment service: ${consumerId}`)
} catch (error) {
this.logger.warn('Failed to register, will use unauthenticated mode')
this.consumerId = 'unregistered'
}
}- On service startup, consumer registers with sentiment service
- Receives unique
consumerId(UUID format) - Includes
consumerIdin all gRPC sentiment analysis requests - Falls back to "unregistered" if registration fails
- Registered consumers: 100 requests/second
- Unregistered: 10 requests/second
consumerIdis stored with each processed comment- Enables tracking which consumer instance processed each comment
- Useful for debugging and monitoring distributed consumer instances
3. Retry Mechanism
Configuration:CONSUMER_MAX_RETRIES=5
CONSUMER_RETRY_DELAY=1000 # Base delay in ms
Initial attempt: Immediate processing
On failure: Publish to retry-queue with scheduled time
Retry attempt 1: 1s delay (1000ms * 2^0)
Retry attempt 2: 2s delay (1000ms * 2^1)
Retry attempt 3: 4s delay (1000ms * 2^2)
Retry attempt 4: 8s delay (1000ms * 2^3)
Retry attempt 5: 16s delay (1000ms * 2^4)
After max retries: → Dead Letter QueueFormula: baseDelay * 2^(attempt - 1) where baseDelay = 1000ms
private async handleRetry(comment: RawComment, attempt: number, errorMessage: string): Promise<void> {
const maxRetries = parseInt(process.env.CONSUMER_MAX_RETRIES || String(RETRY_CONFIG.MAX_ATTEMPTS))
if (attempt >= maxRetries) {
await this.sendToDeadLetterQueue(comment, errorMessage, attempt)
return
}
const delay = this.calculateRetryDelay(attempt)
const retryContext: RetryContext = {
comment,
attempts: attempt,
lastError: errorMessage,
}
// Send to retry queue with scheduled time
await lastValueFrom(
this.kafkaProducer.emit(KAFKA_TOPICS.RETRY_QUEUE, {
key: comment.commentId,
value: JSON.stringify(retryContext),
headers: {
'retry-attempt': String(attempt),
'scheduled-time': String(Date.now() + delay),
},
})
)
}
@EventPattern(KAFKA_TOPICS.RETRY_QUEUE)
async handleRetryQueue(@Payload() message: any): Promise<void> {
const retryContext = RetryContextSchema.parse(message)
const scheduledTime = retryContext.scheduledTime || Date.now()
// Skip if not yet time to retry
if (scheduledTime > Date.now()) {
return
}
// Process the retry
await this.handleRawComment(retryContext.comment)
}- Gives service time to recover
- Prevents overwhelming failed service
- Industry standard pattern
4. Database Storage
Entity: consumer/src/entities/processed-comment.entity.ts
@Entity('processed_comments')
export class ProcessedComment {
@PrimaryGeneratedColumn()
id: number
@Column({ unique: true })
commentId: string
@Column()
text: string
@Column()
@Index()
textHash: string // SHA256 hash for sentiment caching
@Column({
type: 'enum',
enum: CommentTag,
})
@Index()
tag: CommentTag
@Column()
source: string
@CreateDateColumn()
@Index()
processedAt: Date
@Column()
consumerId: string // Which consumer instance processed it
@Column({ default: 0 })
retryCount: number
}Indexes: commentId (unique), textHash, tag, processedAt
Key fields:textHash: SHA256 hash used for sentiment result cachingconsumerId: Tracks which consumer instance processed the commentretryCount: Number of retry attempts before successful processing
- commentId: Fast duplicate checks and ensures no reprocessing
- textHash: Quick sentiment cache lookups
- tag: Dashboard filters by sentiment
- processedAt: "Last hour" statistics and time-based queries
5. Kafka Topics
Consumes from:raw-comments- New comments from producerretry-queue- Failed comments to retry
processed-comments- Successfully processed commentsdead-letter-queue- Failed after max retries
Behavior Details
Message Processing Flow
Step-by-step:-
Receive message from
raw-comments{ "commentId": "abc-123", "text": "Great food!", "source": "twitter", "timestamp": "2026-03-26T10:30:00Z" } -
Check if duplicate
- Check Redis for commentId
processed:abc-123 - If found: Log "Duplicate detected" and skip
- If not found: Mark as processed and continue
- Check Redis for commentId
-
Hash text and check LRU cache
- Hash text:
sha256("great food!")→3a2f1b... - Check LRU cache for this textHash
- If cached: Use cached tag, skip gRPC
- If not cached: Continue to step 4
- Hash text:
-
Call Sentiment service
const response = await sentimentClient.analyzeSentiment({ commentId: 'abc-123', text: 'Great food!', textHash: '3a2f1b...', consumerId: this.consumerId }) // response.tag: "positive" // Cache the result for future use -
Save to database
INSERT INTO processed_comments (commentId, text, textHash, tag, source, processedAt, consumerId, retryCount) VALUES ('abc-123', 'Great food!', '3a2f1b...', 'positive', 'twitter', NOW(), 'consumer-123', 0) -
Mark as processed in Redis
await redisService.markCommentAsProcessed('abc-123') -
Publish to processed-comments
{ "commentId": "abc-123", "text": "Great food!", "tag": "positive", "source": "twitter", "processedAt": "2026-03-26T10:30:05Z", "retryCount": 0 }
Failure Handling
Scenario 1: Sentiment service timeoutAttempt 1: gRPC call fails
Wait 1s
Attempt 2: gRPC call fails
Wait 2s
Attempt 3: gRPC call fails
Wait 4s
Attempt 4: gRPC call fails
Wait 8s
Attempt 5: gRPC call fails
→ Publish to dead-letter-queueAttempt 1: Database save fails
Wait 1s
Attempt 2: Database save succeeds
→ Continue to publish processed-comments{
"originalMessage": { /* raw comment */ },
"error": "gRPC call timeout after 5s",
"retryCount": 5,
"failedAt": "2026-03-26T10:35:00Z"
}Standalone Mode
Bootstrap: consumer/src/main.ts
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
clientId: process.env.KAFKA_CLIENT_ID || 'consumer',
brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
},
consumer: {
groupId: KAFKA_CONSUMER_GROUPS.CONSUMER,
sessionTimeout: 30000,
heartbeatInterval: 3000,
},
subscribe: {
fromBeginning: true, // Process all messages from beginning
},
},
})
await app.listen()
// No HTTP server - standalone Kafka microservice
}- Pure Kafka consumer
- No HTTP endpoints needed
- Lightweight
Performance Characteristics
Processing Rate:- Without duplicates: ~50-100 comments/second
- With duplicates: ~200+ comments/second (cached lookups)
- Deduplication: 1-5ms (LRU) or 10-20ms (Redis)
- gRPC call: 50-200ms (depends on text length)
- Database save: 10-30ms
- Kafka publish: 5-10ms
Total: ~75-265ms per comment
Monitoring
Logs to watch:[ConsumerService] Received comment: abc-123...
[ConsumerService] Comment abc-123 already processed, skipping
[ConsumerService] Using cached sentiment for hash: 3a2f1b...
[SentimentClient] Analyzed sentiment for: abc-123... [positive] in 125ms
[ConsumerService] Successfully processed comment: abc-123... [positive]
[ConsumerService] Max retries reached for comment ghi-789, sending to DLQ- Processed count
- Duplicate count
- Retry count
- DLQ count
- Average processing time
- Cache hit rate
Development
Run locally:cd consumer
pnpm devKAFKA_BROKER=localhost:9092
KAFKA_CLIENT_ID=consumer
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_TTL=10800
CONSUMER_MAX_RETRIES=5
CONSUMER_CACHE_SIZE=100
Testing
Manual Test - Send Comment
# Publish to raw-comments
docker exec -it kafka-broker kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic raw-comments
# Then type JSON:
{"commentId":"test-123","text":"Test comment","source":"twitter","timestamp":"2026-03-26T10:00:00Z"}Check Processing
# View consumer logs
docker logs -f kafka-consumer
# Query database
docker exec kafka-postgres psql -U postgres -d restaurant_comments \
-c "SELECT * FROM processed_comments ORDER BY processedAt DESC LIMIT 5"
# Check processed-comments topic
docker exec -it kafka-broker kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic processed-comments \
--from-beginningNext Steps
- Sentiment Service - How sentiment analysis works
- API & SSE - How data reaches the dashboard
- Getting Started - Full data flow