(示意图说明:典型日志收集系统组成结构)
// 使用go-tail库监控日志文件 tail, err := tail.TailFile( "/var/log/app.log", tail.Config{Follow: true, ReOpen: true}) for line := range tail.Lines { sendToKafka(line.Text) }
// 配置Kafka生产者 config := sarama.NewConfig() config.Producer.Return.Successes = true producer, _ := sarama.NewSyncProducer([]string{"kafka:9092"}, config)
// 发送日志消息
msg := &sarama.ProducerMessage{
Topic: “applogs”,
Value: sarama.StringEncoder(logData),
}
partition, offset, := producer.SendMessage(msg)
- Kafka集群提供高吞吐量缓冲
- 分区机制实现水平扩展
- 消息持久化保证可靠性
3. **日志处理引擎**
```go
// 消费Kafka消息
consumer, _ := sarama.NewConsumer([]string{"kafka:9092"}, nil)
partitionConsumer, _ := consumer.ConsumePartition("app_logs", 0, sarama.OffsetNewest)
for msg := range partitionConsumer.Messages() {
parsedLog := parseLog(msg.Value)
storeToES(parsedLog)
}
// Elasticsearch存储实现 func storeToES(doc LogDocument) { client, _ := elastic.NewClient() _, err := client.Index(). Index("applogs-"+time.Now().Format("2006.01.02")). BodyJson(doc). Do(context.Background()) }
并发处理模型
// 使用worker pool处理日志 jobs := make(chan LogEntry, 100) for w := 1; w <= 5; w++ { go func(id int) { for entry := range jobs { processLog(entry) } }(w) }
日志压缩传输
// 使用Snappy压缩算法 var buf bytes.Buffer writer := s2.NewWriter(&buf) writer.Write([]byte(rawLog)) writer.Close() send(buf.Bytes())
断点续传机制
// 记录消费偏移量 func saveOffset(offset int64) { os.WriteFile("offset.dat", []byte(strconv.FormatInt(offset, 10)), 0644) }
runtime.SetMemoryLimit(2 * 1024 * 1024 * 1024)
go func() { for { time.Sleep(30 * time.Second) debug.FreeOSMemory() } }()
// Prometheus指标暴露 logCount := prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "log_processed_total", Help: "Total processed logs", }, []string{"level"}, ) prometheus.MustRegister(logCount)
引用说明:
(注:文中代码示例经过简化处理,生产环境需添加完整错误处理)