Skip to content

Consumer Service

Overview

The Consumer service is the core processing engine that receives raw comments from Kafka, performs deduplication, calls sentiment analysis, stores results, and handles failures with retry logic.

Purpose

  • Process raw comments from raw-comments topic
  • Eliminate duplicates using Redis (by commentId)
  • Cache sentiment results using LRU (by textHash)
  • Enrich comments with sentiment tags via gRPC
  • Store processed comments in PostgreSQL
  • Publish enriched data to processed-comments topic
  • Handle failures gracefully with retry and dead-letter queue

Architecture

┌─────────────────────────┐
│  Kafka: raw-comments    │
└───────────┬─────────────┘


┌─────────────────────────┐
│  Deduplication Check    │
│  (Redis by commentId)   │
└───────────┬─────────────┘


┌─────────────────────────┐
│  Sentiment gRPC Call    │
│  (LRU cache by textHash)│
└───────────┬─────────────┘


┌─────────────────────────┐
│  Save to PostgreSQL     │
└───────────┬─────────────┘


┌─────────────────────────┐
│  Publish: processed-    │
│  comments               │
└─────────────────────────┘

            ▼ (on failure)
┌─────────────────────────┐
│  Retry Queue (5 max)    │
│  → Dead Letter Queue    │
└─────────────────────────┘

Key Components

1. Comment Deduplication (Redis)

Purpose: Prevents reprocessing the same comment multiple times

Location: consumer/src/redis.service.ts

Implementation:
async isCommentProcessed(commentId: string): Promise<boolean> {
  const key = REDIS_KEYS.PROCESSED_COMMENT(commentId)
  const exists = await this.client.exists(key)
  return exists === 1
}
 
async markCommentAsProcessed(commentId: string): Promise<void> {
  const key = REDIS_KEYS.PROCESSED_COMMENT(commentId)
  const ttl = parseInt(process.env.REDIS_TTL || String(CACHE_CONFIG.REDIS_TTL_SECONDS))
  await this.client.setex(key, ttl, '1')
}
Configuration:
REDIS_TTL=10800  # 3 hours (default from CACHE_CONFIG.REDIS_TTL_SECONDS)
How it works:
  • Uses commentId as deduplication key
  • Key format: processed:{commentId}
  • If comment ID exists in Redis → skip processing
  • If not found → mark as processed and continue
  • 3-hour TTL balances memory usage vs duplicate window
Why commentId instead of text hash:
  • commentId is the unique identifier from the source system
  • Text hash would cause different comments with identical text to be incorrectly deduplicated
  • Redis tracks which specific comments have been processed

1.5. Sentiment Result Caching (LRU)

Purpose: Avoids redundant sentiment analysis for identical text

Location: consumer/src/cache.service.ts

Implementation:
hashText(text: string): string {
  return createHash('sha256')
    .update(text.toLowerCase().trim())
    .digest('hex')
}
 
getCachedSentiment(textHash: string): { tag: string, timestamp: number } | undefined {
  return this.textHashCache.get(textHash)
}
 
setCachedSentiment(textHash: string, tag: string): void {
  this.textHashCache.set(textHash, { tag, timestamp: Date.now() })
}
Configuration:
const textHashCache = new LRUCache({
  max: 100,  // CONSUMER_CACHE_SIZE
  ttl: 1000 * 60 * 60  // 1 hour
})
How it works:
  • Hash comment text (SHA256 of lowercase, trimmed)
  • Check cache for sentiment result
  • If hit → return cached tag, skip gRPC call
  • If miss → call sentiment service, cache result
Why separate from deduplication:
  • Different comments can have identical text
  • Caching sentiment results saves gRPC calls
  • LRU evicts least-recently-used entries when full
Processing Flow:
1. Check Redis: Is this commentId processed? 
   └─ Yes → Skip (duplicate comment)
   └─ No → Continue to step 2
 
2. Hash text, check LRU cache: Do we have sentiment for this text?
   └─ Yes → Use cached tag, skip gRPC
   └─ No → Call sentiment service, cache result

2. Sentiment Client Service

Location: consumer/src/sentiment-client.service.ts

Auto-registration on startup:
async onModuleInit() {
  try {
    const { consumerId } = await this.grpcService.RegisterConsumer({
      consumerName: 'consumer-service'
    })
    
    this.consumerId = consumerId
    this.logger.log(`Registered with sentiment service: ${consumerId}`)
  } catch (error) {
    this.logger.warn('Failed to register, will use unauthenticated mode')
    this.consumerId = 'unregistered'
  }
}
Sentiment Analysis Call:
async analyzeSentiment(request: SentimentRequest): Promise<SentimentResponse> {
  const enrichedRequest = {
    ...request,
    consumerId: this.consumerId,
  }
  
  return new Promise((resolve, reject) => {
    this.client.AnalyzeSentiment(enrichedRequest, (error, response) => {
      if (error) reject(error)
      else resolve(response)
    })
  })
}
Request fields:
  • commentId: Unique comment identifier
  • text: Comment text to analyze
  • textHash: SHA256 hash of normalized text
  • consumerId: Consumer's registered ID (auto-added)
Rate Limits:
  • Registered (has consumerId): 100 requests/second
  • Unregistered: 10 requests/second

2.5. Consumer Registration

Purpose: Authenticate with sentiment service for higher rate limits

Location: consumer/src/sentiment-client.service.ts

Registration Flow:
async onModuleInit() {
  try {
    const { consumerId } = await this.grpcService.RegisterConsumer({
      consumerName: 'consumer-service'
    })
    
    this.consumerId = consumerId
    this.logger.log(`Registered with sentiment service: ${consumerId}`)
  } catch (error) {
    this.logger.warn('Failed to register, will use unauthenticated mode')
    this.consumerId = 'unregistered'
  }
}
What happens:
  • On service startup, consumer registers with sentiment service
  • Receives unique consumerId (UUID format)
  • Includes consumerId in all gRPC sentiment analysis requests
  • Falls back to "unregistered" if registration fails
Rate Limits:
  • Registered consumers: 100 requests/second
  • Unregistered: 10 requests/second
Database tracking:
  • consumerId is stored with each processed comment
  • Enables tracking which consumer instance processed each comment
  • Useful for debugging and monitoring distributed consumer instances

3. Retry Mechanism

Configuration:
CONSUMER_MAX_RETRIES=5
CONSUMER_RETRY_DELAY=1000  # Base delay in ms
Exponential Backoff:
Initial attempt: Immediate processing
On failure: Publish to retry-queue with scheduled time
Retry attempt 1: 1s delay (1000ms * 2^0)
Retry attempt 2: 2s delay (1000ms * 2^1)
Retry attempt 3: 4s delay (1000ms * 2^2)
Retry attempt 4: 8s delay (1000ms * 2^3)
Retry attempt 5: 16s delay (1000ms * 2^4)
After max retries: → Dead Letter Queue

Formula: baseDelay * 2^(attempt - 1) where baseDelay = 1000ms

Implementation:
private async handleRetry(comment: RawComment, attempt: number, errorMessage: string): Promise<void> {
  const maxRetries = parseInt(process.env.CONSUMER_MAX_RETRIES || String(RETRY_CONFIG.MAX_ATTEMPTS))
 
  if (attempt >= maxRetries) {
    await this.sendToDeadLetterQueue(comment, errorMessage, attempt)
    return
  }
 
  const delay = this.calculateRetryDelay(attempt)
  
  const retryContext: RetryContext = {
    comment,
    attempts: attempt,
    lastError: errorMessage,
  }
 
  // Send to retry queue with scheduled time
  await lastValueFrom(
    this.kafkaProducer.emit(KAFKA_TOPICS.RETRY_QUEUE, {
      key: comment.commentId,
      value: JSON.stringify(retryContext),
      headers: {
        'retry-attempt': String(attempt),
        'scheduled-time': String(Date.now() + delay),
      },
    })
  )
}
 
@EventPattern(KAFKA_TOPICS.RETRY_QUEUE)
async handleRetryQueue(@Payload() message: any): Promise<void> {
  const retryContext = RetryContextSchema.parse(message)
  const scheduledTime = retryContext.scheduledTime || Date.now()
  
  // Skip if not yet time to retry
  if (scheduledTime > Date.now()) {
    return
  }
  
  // Process the retry
  await this.handleRawComment(retryContext.comment)
}
Why exponential backoff:
  • Gives service time to recover
  • Prevents overwhelming failed service
  • Industry standard pattern

4. Database Storage

Entity: consumer/src/entities/processed-comment.entity.ts

@Entity('processed_comments')
export class ProcessedComment {
  @PrimaryGeneratedColumn()
  id: number
 
  @Column({ unique: true })
  commentId: string
 
  @Column()
  text: string
 
  @Column()
  @Index()
  textHash: string  // SHA256 hash for sentiment caching
 
  @Column({
    type: 'enum',
    enum: CommentTag,
  })
  @Index()
  tag: CommentTag
 
  @Column()
  source: string
 
  @CreateDateColumn()
  @Index()
  processedAt: Date
 
  @Column()
  consumerId: string  // Which consumer instance processed it
 
  @Column({ default: 0 })
  retryCount: number
}

Indexes: commentId (unique), textHash, tag, processedAt

Key fields:
  • textHash: SHA256 hash used for sentiment result caching
  • consumerId: Tracks which consumer instance processed the comment
  • retryCount: Number of retry attempts before successful processing
Why these indexes:
  • commentId: Fast duplicate checks and ensures no reprocessing
  • textHash: Quick sentiment cache lookups
  • tag: Dashboard filters by sentiment
  • processedAt: "Last hour" statistics and time-based queries

5. Kafka Topics

Consumes from:
  • raw-comments - New comments from producer
  • retry-queue - Failed comments to retry
Publishes to:
  • processed-comments - Successfully processed comments
  • dead-letter-queue - Failed after max retries

Behavior Details

Message Processing Flow

Step-by-step:
  1. Receive message from raw-comments

    {
      "commentId": "abc-123",
      "text": "Great food!",
      "source": "twitter",
      "timestamp": "2026-03-26T10:30:00Z"
    }
  2. Check if duplicate
    • Check Redis for commentId processed:abc-123
    • If found: Log "Duplicate detected" and skip
    • If not found: Mark as processed and continue
  3. Hash text and check LRU cache
    • Hash text: sha256("great food!")3a2f1b...
    • Check LRU cache for this textHash
    • If cached: Use cached tag, skip gRPC
    • If not cached: Continue to step 4
  4. Call Sentiment service
    const response = await sentimentClient.analyzeSentiment({
      commentId: 'abc-123',
      text: 'Great food!',
      textHash: '3a2f1b...',
      consumerId: this.consumerId
    })
    // response.tag: "positive"
    // Cache the result for future use
  5. Save to database
    INSERT INTO processed_comments (commentId, text, textHash, tag, source, processedAt, consumerId, retryCount)
    VALUES ('abc-123', 'Great food!', '3a2f1b...', 'positive', 'twitter', NOW(), 'consumer-123', 0)
  6. Mark as processed in Redis
    await redisService.markCommentAsProcessed('abc-123')
  7. Publish to processed-comments
    {
      "commentId": "abc-123",
      "text": "Great food!",
      "tag": "positive",
      "source": "twitter",
      "processedAt": "2026-03-26T10:30:05Z",
      "retryCount": 0
    }

Failure Handling

Scenario 1: Sentiment service timeout
Attempt 1: gRPC call fails
Wait 1s
Attempt 2: gRPC call fails
Wait 2s
Attempt 3: gRPC call fails
Wait 4s
Attempt 4: gRPC call fails
Wait 8s
Attempt 5: gRPC call fails
→ Publish to dead-letter-queue
Scenario 2: Database connection lost
Attempt 1: Database save fails
Wait 1s
Attempt 2: Database save succeeds
→ Continue to publish processed-comments
Dead Letter Queue message:
{
  "originalMessage": { /* raw comment */ },
  "error": "gRPC call timeout after 5s",
  "retryCount": 5,
  "failedAt": "2026-03-26T10:35:00Z"
}

Standalone Mode

Bootstrap: consumer/src/main.ts

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
    transport: Transport.KAFKA,
    options: {
      client: {
        clientId: process.env.KAFKA_CLIENT_ID || 'consumer',
        brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
      },
      consumer: {
        groupId: KAFKA_CONSUMER_GROUPS.CONSUMER,
        sessionTimeout: 30000,
        heartbeatInterval: 3000,
      },
      subscribe: {
        fromBeginning: true, // Process all messages from beginning
      },
    },
  })
  
  await app.listen()
  // No HTTP server - standalone Kafka microservice
}
Why microservice mode:
  • Pure Kafka consumer
  • No HTTP endpoints needed
  • Lightweight

Performance Characteristics

Processing Rate:
  • Without duplicates: ~50-100 comments/second
  • With duplicates: ~200+ comments/second (cached lookups)
Latency breakdown:
  • Deduplication: 1-5ms (LRU) or 10-20ms (Redis)
  • gRPC call: 50-200ms (depends on text length)
  • Database save: 10-30ms
  • Kafka publish: 5-10ms

Total: ~75-265ms per comment

Monitoring

Logs to watch:
[ConsumerService] Received comment: abc-123...
[ConsumerService] Comment abc-123 already processed, skipping
[ConsumerService] Using cached sentiment for hash: 3a2f1b...
[SentimentClient] Analyzed sentiment for: abc-123... [positive] in 125ms
[ConsumerService] Successfully processed comment: abc-123... [positive]
[ConsumerService] Max retries reached for comment ghi-789, sending to DLQ
Metrics:
  • Processed count
  • Duplicate count
  • Retry count
  • DLQ count
  • Average processing time
  • Cache hit rate

Development

Run locally:
cd consumer
pnpm dev
Environment variables:
KAFKA_BROKER=localhost:9092
KAFKA_CLIENT_ID=consumer
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_TTL=10800
CONSUMER_MAX_RETRIES=5
CONSUMER_CACHE_SIZE=100

Testing

Manual Test - Send Comment

# Publish to raw-comments
docker exec -it kafka-broker kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic raw-comments
 
# Then type JSON:
{"commentId":"test-123","text":"Test comment","source":"twitter","timestamp":"2026-03-26T10:00:00Z"}

Check Processing

# View consumer logs
docker logs -f kafka-consumer
 
# Query database
docker exec kafka-postgres psql -U postgres -d restaurant_comments \
  -c "SELECT * FROM processed_comments ORDER BY processedAt DESC LIMIT 5"
 
# Check processed-comments topic
docker exec -it kafka-broker kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic processed-comments \
  --from-beginning

Next Steps