一、下载安装java(已有java环境跳过)

安装步骤参考:https://blog.csdn.net/manongxianfeng/article/details/112641235

官网下载:
https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html

下载 jdk-8u281-linux-x64.tar.gz

mkdir /usr/java
cp jdk1.8.0_281 /usr/java/jdk1.8.0_281
vim /etc/profile

写入下面的内容

export JAVA_HOME=/usr/java/jdk1.8.0_281
export CLASSPATH=$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH

然后按 Esc,输入 :wq,保存修改并退出。

source /etc/profile

$ java -version

java version "1.8.0_271"
Java(TM) SE Runtime Environment (build 1.8.0_271-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.271-b13, mixed mode)

到此java安装完成

二、安装kafka

安装版本:

kafka_2.12-1.0.1.tgz

参考:

https://blog.csdn.net/bugzeroman/article/details/104456222

1.解压

a) 解压:tar -zxvf kafka_2.12-1.0.1.tgz
b) 移动到指定目录:mv kafka_2.12-1.0.1 /usr/local/kafka
c) cd /usr/local/kafka

2.配置系统变数量

vim /etc/profile

写入下面的内容

export PATH=export PATH=/usr/local/kafka/bin:$PATH/bin:$PATH

然后按 Esc,输入 :wq,保存修改并退出。

source /etc/profile

3.修改Kafka配置文件

首先创建本地数据存放的目录:

mkdir /home/kafka/kafka_2.12-1.0.1/data

vim config/server.properties

#服务器IP地址,修改为自己的服务器IP
host.name=192.168.200.200
#Kafka的topic内的数据本地保存目录
log.dirs=/home/kafka/kafka_2.12-1.0.1/data 

以下使用默认配置即可:

#如果搭建Kafka集群,需要保证唯一
broker.id=0
#端口号、记得开启端口,云服务器要开放安全组
port=9092
#zookeeper地址和端口, Kafka支持内置的Zookeeper和引用外部的Zookeeper
zookeeper.connect=localhost:2181
#topic的默认分区数
num.partitions=1
#Kafka接收的数据保存7天,之后会被删除
log.retention.hours=168
#数据最大为1G,超过需要分块

4.修改Zookeeper配置

首先创建本地数据存放的目录:

mkdir /home/kafka/kafka_2.12-1.0.1/zookeeper

vim config/zookeeper.properties
主要修改如下配置:

#Zookeeper数据本地存储目录
dataDir=/home/kafka/kafka_2.12-1.0.1/zookeeper

以下使用默认配置即可:

# Zookeeper对外提供服务的端口
clientPort=2181

5.启动Zookeeper

启动Kafka服务必须先启动Zookeeper服务,
使用如下命令后台启动自带的Zookeeper:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

6.启动Kafka

注意使用-daemon后台启动Kafka服务:

bin/kafka-server-start.sh -daemon config/server.properties

7.查看进程

看到如下进程,
一个是Kafka,另一个是Zookeeper:

ps aux |grep zookeeper
ps aux |grep kafka

三、验证安装kafka

1.创建一个名为testTopic的主题

kafka-topics.sh --create --topic testTopic --replication-factor 1 --partitions 1 --zookeeper localhost:2181

由于只有一个Kafka服务,
创建主题时指定副本和分区都是1个。

2.查看刚才创建的主题

haima@haima-PC:~/Desktop$ kafka-topics.sh --list --zookeeper localhost:2181
testTopic

3.使用producer生产者发送消息

注意使用上面配置的host.name连接的ip地址 192.168.200.200
而且Kafka服务端口号默认为9092,
否则有可能无法连接成功:

kafka-console-producer.sh --broker-list 192.168.200.200:9092 --sync --topic testTopic

然后输入要发送的消息:
hello wrold

4.使用consumer消费者查看刚才发送的消息

kafka-console-consumer.sh --zookeeper 192.168.200.200:2181 --topic testTopic --from-beginning

或者

kafka-console-consumer.sh --bootstrap-server 192.168.200.200:9092 --topic testTopic --from-beginning

Kafka官方推荐使用bootstrap-server方式连接。

5.删除一个Topic

这里先创建一个主题,再删除掉:

kafka-topics.sh --create --topic testDelete --zookeeper localhost:2181 --replication-factor 1 --partitions 1  
kafka-topics.sh --delete --topic testDelete --zookeeper localhost:2181  

注意修改vim config/server.properties配置文件,
delete.topic.enable设置为true才能真正删除topic:

#是否能够删除topic的开关,默认为false
delete.topic.enable=true

6.Kafka Debug方法

在kafka-run-class.sh增加如下参数:

KAFKA_DEBUG=true
DEBUG_SUSPEND_FLAG="y"

7.清除kafka数据

只要删除上面配置的两个本地存储目录即可:

rm -rf /home/kafka/kafka_2.12-1.0.1/data
rm -rf /home/kafka/kafka_2.12-1.0.1/zookeeper

三、logstash-6-4-0安装

1.下载软件

下载地址:

https://www.elastic.co/cn/downloads/past-releases/logstash-6-4-0

安装参考文档:

https://blog.csdn.net/weixin_33991727/article/details/92698794

mkdir -p /usr/local/logstash
cp logstash-6.4.0.tar.gz /usr/local/logstash/
cd /usr/local/logstash/
tar -zxvf logstash-6.4.0.tar.gz -C /usr/local/logstash/
cd logstash-6.4.0/

2.设置全局变量

vim /etc/profile

写入下面的内容

export LOGSTARSH_HOME=/usr/local/logstash/logstash-6.4.0
export PATH=$PATH:$LOGSTARSH_HOME/bin

然后按 Esc,输入 :wq,保存修改并退出。

source /etc/profile

常用命令

logstash --help
logstash --version

3.修改配置文件

vim config/datapro.conf

写入下面的内容

input{
  kafka {
    bootstrap_servers => "192.168.200.200:9092" #kafka地址
    topics => ["es_rules"]  #kafka topics
    group_id  => "logstash_queries_rules" #kafka group id,如果想要重新读取需要修改次选项
    auto_offset_reset => "earliest" #自动将偏移重置为最早的偏移量
    type => "es_rules"
  }
  kafka {
    bootstrap_servers => "192.168.200.200:9092" #kafka地址
    topics => ["es_queries"]  #kafka topics
    group_id  => "logstash_queries_rules" #kafka group id,如果想要重新读取需要修改次选项
    auto_offset_reset => "earliest" #自动将偏移重置为最早的偏移量
    type => "es_queries"
  }
  kafka {
    bootstrap_servers => "192.168.200.200:9092" #kafka地址
    topics => ["es_country_rules"]  #kafka topics
    group_id  => "logstash_queries_rules" #kafka group id,如果想要重新读取需要修改次选项
    auto_offset_reset => "earliest" #自动将偏移重置为最早的偏移量
    type => "es_country_rules"
  }
  kafka {
    bootstrap_servers => "192.168.200.200:9092" #kafka地址
    topics => ["es_rules"]  #kafka topics
    group_id  => "logstash_queries_rules" #kafka group id,如果想要重新读取需要修改次选项
    auto_offset_reset => "earliest" #自动将偏移重置为最早的偏移量
    type => "es_es_rules"
  }
  kafka {
    bootstrap_servers => "192.168.200.200:9092" #kafka地址
    topics => ["es_rules_delete"]  #kafka topics
    group_id  => "logstash_queries_rules" #kafka group id,如果想要重新读取需要修改次选项
    auto_offset_reset => "earliest" #自动将偏移重置为最早的偏移量
    type => "es_es_rules_delete"
  }
}

filter {
  json {
    source => "message"
  }
  mutate {
    remove_field => "@timestamp"
    remove_field => "message"
    remove_field => "@version"
  }
  date {
    match => [ "loginDat", "yyyy-MM-dd HH:mm:ss"]
  }
}

output {
  if [type] == "es_rules" {
    elasticsearch {
      hosts => "10.10.11.66:9200" #es地址
      index => "rules"
      document_type => "rule"
      document_id => "%{es_id}"
    }
  }
  else if [type] == "es_queries"{
    elasticsearch {
      hosts => "10.10.11.66:9200" #es地址
      index => "queries"
      document_type => "query"
      document_id => "%{es_id}"
    }
  }
  else if [type] == "es_country_rules"{
    elasticsearch {
      hosts => "10.10.11.66:9200"
      index => "country_rules" #es地址
      document_type => "rule"
      document_id => "%{es_id}"
    }
  }
  else if [type] == "es_es_rules" {
    elasticsearch {
      hosts => "10.10.11.66:9200" #es地址
      index => "es_rules"
      document_type => "rule"
      document_id => "%{es_id}"
    }
  }
  else if [type] == "es_rules_delete"{
    elasticsearch{
      action => "delete"
      hosts => "10.10.11.66:9200" #es地址
      index => "es_rules"
      document_type => "rule"
      document_id => "%{es_id}"
    }
  }
}

4.启动服务

#控制台启动,且输出日志
logstash -f config/datapro.conf

#后台启动
logstash -f config/datapro.conf &
作者:海马  创建时间:2023-03-18 09:28
最后编辑:海马  更新时间:2025-01-27 10:55