kafka操作_logagent实操一下

第一步当然是安装了
安装后启动 不需要修改配置啥的

传送门  按着这个操作就行

这个是文件目录

kafka.go

package kafka

import (
"fmt"
"github.com/Shopify/sarama"
)

var client sarama.SyncProducer

//初始化
func Init( addr []string)(err error) {
config := sarama.NewConfig()
// 等待服务器所有副本都保存成功后的响应
config.Producer.RequiredAcks = sarama.WaitForAll
// 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 是否等待成功和失败后的响应
config.Producer.Return.Successes = true

// 使用给定代理地址和配置创建一个同步生产者
client, err = sarama.NewSyncProducer([]string{"0.0.0.0:9092"}, config)
if err != nil {
return
}
return
}

//保存数据
func SendKafka(msgType,value string) {
//构建发送的消息,
msg := &sarama.ProducerMessage{
//Topic: "test",//包含了消息的主题
Partition: int32(10), //
Key: sarama.StringEncoder("key"), //
}

msg.Topic = msgType
//将字符串转换为字节数组
msg.Value = sarama.ByteEncoder(value)
//fmt.Println(value)
//SendMessage:该方法是生产者生产给定的消息
//生产成功的时候返回该消息的分区和所在的偏移量
//生产失败的时候返回error
partition, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("Send message Fail")
}
fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
}

taillog.go

package taillog

import (
"fmt"
"github.com/hpcloud/tail"
)

var tails *tail.Tail
var LogChan chan string

func Init(filename string) (err error) {

tails, err = tail.TailFile(filename, tail.Config{
ReOpen: true, //重新打开文件
Follow: true, //是否跟随
// Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //读文件的位置
MustExist: false, //日志不存在是否报错 false 不报错
Poll: true,
})
if err != nil {
fmt.Println("tail file err:", err)
return
}
return
}

//读
func ReadChan() (<-chan *tail.Line){
return tails.Lines
}

main.go

package main

import (
"DarkHorse/studygo/logagent/kafka"
"DarkHorse/studygo/logagent/taillog"
"fmt"
"time"
)
// bin\windows\kafka-console-consumer.bat --bootstrap-server=127.0.0.1:9092 --topic=001 --from-beginning
func run() {
for true {
select {
case line := <-taillog.ReadChan():
kafka.SendKafka("001",line.Text)
default:
time.Sleep(100 * time.Millisecond)
continue
}
}
}

func main() {
//初始化kafka
err := kafka.Init([]string{"0.0.0.0:9092"})
if err !=nil{
fmt.Println("kafka 启动失败",err)
}

//打开收集日志
taillog.Init(`E:\Go\src\DarkHorse\studygo\logagent\log\zz.log`)
if err !=nil{
fmt.Println("taillog 启动失败",err)
}

run()
}

zz.log 这个文件你就当是日志
运行后 在你 安装kafka的根目录找

最后再打包一份源码

logagent.zip 





伍先生
  • 职业: 程序员,产品
  • 码龄: 4.1
  • 技能: PHP Go 前端
  • 微信: JwCode
  • 公众号/小程序: 渐悟分享