Skip to content

Producer Service

Overview

The Producer service generates mock restaurant comments and publishes them to Kafka's raw-comments topic. It runs as a standalone application without an HTTP server.

Purpose

Simulates real-world comment streams from social media platforms to test the entire processing pipeline including:

  • High-volume message generation
  • Duplicate detection
  • Failure handling
  • Real-time processing

Architecture

┌─────────────────────────┐
│   Comment Generator     │
│  (40+ templates)        │
└───────────┬─────────────┘


┌─────────────────────────┐
│   Variable Delay        │
│  (100ms - 10s)          │
└───────────┬─────────────┘


┌─────────────────────────┐
│   Duplicate Injector    │
│  (5% rate)              │
└───────────┬─────────────┘


┌─────────────────────────┐
│   Kafka Producer        │
│  → raw-comments topic   │
└─────────────────────────┘

Key Components

Comment Generator Service

Location: producer/src/comment-generator.service.ts

Template Categories:
  1. Positive Comments (10 templates)
    • "Amazing food! Best restaurant in town! 😍"
    • "The service was exceptional, will definitely come back!"
    • "Perfect place for a romantic dinner ❤️"
  2. Negative Comments (10 templates)
    • "Waited 2 hours for our food. Terrible service."
    • "The food was cold and tasteless. Very disappointed."
    • "Overpriced and underwhelming. Won't be returning."
  3. Neutral Comments (10 templates)
    • "The food was okay, nothing special."
    • "Average experience. Wouldn't go out of my way to visit."
    • "Decent portions, average taste."
  4. Unrelated Comments (10+ templates)
    • "Does anyone know what time they close?"
    • "Check out my food blog! Link in bio"
    • "Random spam message about crypto investment"

Total: 40+ unique templates

Comment Generation Logic

generateComment(): RawComment {
  // 5% chance to create a duplicate
  const shouldDuplicate = Math.random() < this.duplicateRate
  
  if (shouldDuplicate && this.processedComments.size > 0) {
    // Pick random previous comment
    const commentId = pickRandom(this.processedComments)
    const text = this.getTextFromCache(commentId)
  } else {
    // Generate new comment
    const commentId = uuidv4()
    const template = pickRandom(this.commentTemplates)
    const text = this.fillTemplate(template)
    const source = pickRandom(['twitter', 'instagram', 'facebook', 'tiktok'])
  }
  
  return { commentId, text, source, timestamp: new Date() }
}

Behavior Details

Variable Speed Generation

Configuration:
PRODUCER_MIN_DELAY=100    # 100ms minimum
PRODUCER_MAX_DELAY=10000   # 10s maximum
How it works:
  • Random delay between min and max
  • Creates realistic burst patterns
  • Tests consumer's handling of varying load
Example pattern:
Comment 1 → 150ms → Comment 2 → 8500ms → Comment 3 → 350ms → Comment 4

Duplicate Injection

Configuration:
PRODUCER_DUPLICATE_RATE=0.05  # 5% of comments
How it works:
  1. 5% of the time, instead of generating new comment
  2. Pick random previously generated comment ID
  3. Reuse same text and source
  4. Generate new timestamp
Why:
  • Tests Redis deduplication
  • Tests LRU cache behavior
  • Simulates real-world duplicate submissions
Example:
Comment A (ID: abc-123, Text: "Great food!")
... 50 comments later ...
Comment B (ID: abc-123, Text: "Great food!")  ← Duplicate

Note: The current implementation reuses the commentId but regenerates similar text from templates. In a production system, you would store an exact text-to-ID mapping to create truly identical duplicates.

Source Distribution

Sources:
  • Twitter
  • Instagram
  • Facebook
  • TikTok

Distribution: Random uniform (25% each)

Why multiple sources:
  • Tests source-based filtering
  • Simulates multi-platform aggregation
  • Provides grouping dimension for analytics

Template Variables

Some templates include placeholders:

"The {dish} was to die for! Coming back tomorrow"
Filled with:
  • "pasta"
  • "burger"
  • "steak"
  • "salad"
  • "pizza"
Result:
"The pasta was to die for! Coming back tomorrow"

Message Format

Published to: raw-comments topic

Structure:
interface RawComment {
  commentId: string      // UUID v4
  text: string          // Filled template
  source: string        // twitter | instagram | facebook | tiktok
  timestamp: Date       // Generation time
}
Example:
{
  "commentId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "text": "Amazing food! Best restaurant in town! 😍",
  "source": "instagram",
  "timestamp": "2026-03-26T10:30:45.123Z"
}

Standalone Mode

Bootstrap: producer/src/main.ts

async function bootstrap() {
  const app = await NestJS.createApplicationContext(AppModule)
  
  // No HTTP server - purely Kafka producer
  // Runs until SIGTERM/SIGINT
  
  process.on('SIGTERM', () => app.close())
}
Why standalone:
  • No need for HTTP endpoints
  • Lightweight (no Express overhead)
  • Single responsibility: generate messages

Monitoring

Logs:
[Producer] Published comment: abc-123 (twitter)
[Producer] Published comment: def-456 (instagram) [DUPLICATE]
Metrics tracked:
  • Total comments generated
  • Duplicate rate
  • Comments per second
  • Source distribution

Health check: Producer has no HTTP server, monitor via:

  • Docker container status
  • Kafka topic message count
  • Consumer processing logs

Development

Run standalone:
cd producer
pnpm dev
Environment variables:
KAFKA_BROKER=localhost:9092
PRODUCER_MIN_DELAY=100
PRODUCER_MAX_DELAY=10000
PRODUCER_DUPLICATE_RATE=0.05
Stop gracefully:
# Sends SIGTERM, allows cleanup
docker stop kafka-producer

Testing the Producer

1. Watch Kafka Topic

docker exec -it kafka-broker kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic raw-comments \
  --from-beginning

2. Check Message Count

docker exec -it kafka-broker kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 \
  --topic raw-comments

3. Monitor Docker Logs

docker logs -f kafka-producer

Next Steps