flume kafka sys

flume kafka sys,第1张

flume kafka sys

1.启动kafka
hadoop-
zkServer
kafka-server-start.sh /opt/soft/kafka200/config/server.properties


//查看数据量 mac:0:28
kafka-run-class.sh
kafka.tools.GetOffsetShell  
--broker-list 192.168.10.136:9092
--topic event_attendees --time -1

kafka-consumer-groups.sh
--bootstrap-server 192.168.10.136:9092
--group mac
--reset-offsets
--all-topics
--to-earliest
--execute


flume-ng agent -n event_attendees_raw -f /opt/fconf/event_attendees_raw.conf 

kafka-topics.sh --zookeeper 192.168.10.136:2181 --list
kafka-topics.sh --create --zookeeper 192.168.10.136:2181 --replication-factor 1 --partitions 1 --topic train    
kafka-console-consumer.sh --bootstrap-server 192.168.10.136:9092 -topic users --from-beginning
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.10.136:9092 --topic mydemo
kafka-consumer-groups.sh --bootstrap-server 192.168.10.136:9092 --group zbx --reset-offsets --all-topics --to-earliest --execute
kafka-topics.sh --create zookeeper 192.168.10.136:2181 replication-factor 1 partitions 1 topics mydemo
flume-ng agent --name a1 -f /opt/fconf/filetest03.conf


1.建立topic(消息队列)
kafka-topics.sh --create
--zookeeper 你的zookeeper的IP:2181             
--replication-factor 副本数
--partitions 分区数
--topic 消息队列名

2.检查队列是否创建成功
kafka-topics.sh 
--zookeeper 你的zookeeper的IP:2181  
--list

3.向你的消息队列中生产消息
kafka-console-producer.sh 
--topic 队列名
--broker-list 你的kafka队列的机器IP:9092

4.消费消息
kafka-console-consumer.sh 
--bootstrap-server 你的kafka的IP:9092
--topic 队列名

5.查看队列
kafka-topics.sh --zookeeper 192.168.10.136:2181 --list

6删除队列
kafka-topics.sh --zookeeper 192.168.10.136:2181 --delete --topic train

7上传
flume-ng agent --name train -f /opt/fconf/train

8查询数量
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.10.136:9092 --topic train -time  -1

9.监听监视器
kafka-console-consumer.sh --bootstrap-server 192.168.10.136:9092 -topic users --from-beginning

#!/bin/bash
my_start(){
    if [ $1 == "start" ];then
        #start hadoop
        sh /opt/soft/hadoop260/sbin/start-dfs.sh
        sh /opt/soft/hadoop260/sbin/start-yarn.sh
        #start hive
        nohup /opt/soft/hive110/bin/hive --service hiveserver2 &
        #start zeppelin
        sh /opt/soft/zeppelin081/bin/zeppelin-daemon.sh start
        echo "start over"
        
        # kafka  kafka-server-start.sh /opt/soft/kafka200/config/server.properties     
        #  kafka-topics.sh --zookeeper 192.168.10.136:2181 --list
        #  kafka-console-consumer.sh --bootstrap-server 192.168.10.136:9092 --topic event_attendees_raw --from-beginning
        # kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.10.136:9092 --topic event_attendees --time -1
        #kafka-topics.sh --zookeeper 192.168.10.136:2181 --create --topic demo --partitions 1 --replication-factor 1

        #kafka-console-consumer.sh --bootstrap-server 192.168.10.136:9092 --topic demo --from-beginning
        #kafka-console-producer.sh --broker-list 192.168.10.136:9092 --topic demo
        
        
#        flume-ng agent -n event_attendees_raw -f /opt/fconf/event_attendees_raw.conf 
            
        #先开zookeeper 后面再开hbase
        
        #flume#flume
        
        # flume  flume-ng agent -n uf -f /opt/fconf/uf.conf 
           
        # spark cd /opt/soft/spark234/sbin/  sh start-all.sh
        # cd bin/  ./spark-shell
        
        # start-hbase.sh   hbase shell
    else
        #close zeppelin
        sh /opt/soft/zeppelin081/bin/zeppelin-daemon.sh stop
        #close hive
        hiveprocess=`jps | grep RunJar | awk '{print $1}'`
        kill -9 $hiveprocess
        #stop hadoop
        sh /opt/soft/hadoop260/sbin/stop-dfs.sh
        sh /opt/soft/hadoop260/sbin/stop-yarn.sh
        echo "stop over"
    fi
}
my_start $1
 

#!/bin/bash

#修改系统名字 同时修改hosts文件
modify_sysname(){
    hostnamectl set-hostname $1
    #先获取hosts文件中对应的内容 如果没发现对应的内容才能添加这个地址
    cfg=`cat /etc/hosts | grep $2 | grep -wF $1`
    if [ "$cfg" == "" ];then
        #根据ip地址修改hosts文件
        echo "$2 $1" >> /etc/hosts
    else echo "do nothing"
    fi
}

#修改IP静态地址
modify_staticip(){
    #先检查文件是否已被修改过
    chk=`cat /etc/sysconfig/network-scripts/ifcfg-ens33 | grep static`
    if [ "$chk" == "" ];then
    #修改/etc/sysconfig/network-scripts/ifcfg-ens33中的dhcp
    sed -i 's/dhcp/static/' /etc/sysconfig/network-scripts/ifcfg-ens33
    echo "IPADDR=$1" >> /etc/sysconfig/network-scripts/ifcfg-ens33
    echo "NETMASK=255.255.255.0" >> /etc/sysconfig/network-scripts/ifcfg-ens33
    echo "GATEWAY=${1%.*}.2" >> /etc/sysconfig/network-scripts/ifcfg-ens33
    echo "DNS1=114.114.114.114" >> /etc/sysconfig/network-scripts/ifcfg-ens33
    echo "DNS2=8.8.8.8" >> /etc/sysconfig/network-scripts/ifcfg-ens33
    fi
    systemctl restart network
}

#关闭防火墙
close_firewalld(){
    systemctl stop firewalld
    systemctl disable firewalld
}

#修改yum源为阿里源
modify_yumsource(){
    #检查是否已有备份文件 如果有则说明已经做过了
    if [ -e /etc/yum.repos.d/CentOS-base.repo_bak ];then
        echo "do nothing"
    else
    #首先安装wget命令
    yum install -y wget vim
    #修改yum
    cd /etc/yum.repos.d/
    mv CentOS-base.repo CentOS-base.repo_bak
    wget -O /etc/yum.repos.d/CentOS-base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
    yum clean all
    yum makecache
    fi
}

#检查文件的文件夹是否存在 不存在就创建一个
check_soft_folder(){
    if [ -e /opt/soft/$1 ];then
        echo "/opt/soft/$1 folder already exists"
        return 0
    else
        mkdir -p /opt/soft/$1
        return 1
    fi
}

#安装JDK
setup_jdk(){
    check_soft_folder jdk180
    if [ $? == 1 ];then
        #在opt文件夹下搜索jdk的tar.gz文件
        jdkName=`ls /opt/ | grep jdk*`
        #将文件解压到对应的soft文件夹下
        tar -zxf /opt/$jdkName -C /opt/soft/jdk180 --strip-components 1
        #配置/etc/profile文件
        echo "" >> /etc/profile
        echo "#java environment" >> /etc/profile
        echo "export JAVA_HOME=/opt/soft/jdk180" >> /etc/profile
        echo "export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar" >> /etc/profile
        echo "export PATH=$PATH:$JAVA_HOME/bin" >> /etc/profile
        source /etc/profile
    fi
}

#安装mysql5.7
setup_mysql(){
    #检查linux的mariadb是否卸载  如果没有 说明没有安装过mysql
    mdb=`rpm -qa | grep mariadb`
    if [ "$mdb" != "" ];then
        rpm -e --nodeps $mdb
        cd /opt/
        wget -i -c http://dev.mysql.com/get/mysql57-community-release-el7-10.noarch.rpm
        yum -y install mysql57-community-release-el7-10.noarch.rpm
        yum -y install mysql-community-server
        #修改/etc/my.cnf文件解决中文乱码
        sed -i '/socket/a character-set-server=utf8' /etc/my.cnf
        echo "[client]" >> /etc/my.cnf
        echo "default-character-set=utf8" >> /etc/my.cnf
        echo "[mysql]" >> /etc/my.cnf
        echo "default-character-set=utf8" >> /etc/my.cnf
        systemctl start mysqld.service
        #获取临时密码
        pwdinfo=`grep "password" /var/log/mysqld.log | grep -w password`
        passwd=${pwdinfo#*localhost:}
        passwd=$(echo $passwd)
        #执行修改密码语句
        mysql -uroot -p"$passwd" --connect-expired-password -e "set global validate_password_policy=0"
        mysql -uroot -p"$passwd" --connect-expired-password -e "set global validate_password_length=1"
        mysql -uroot -p"$passwd" --connect-expired-password -e "ALTER USER 'root'@'localhost' IDENTIFIED BY 'okok'"
        echo "如下所示"
        echo "$passwd"
        #修改远程登录
        mysql -uroot -pokok -e "GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'okok'"
        mysql -uroot -pokok -e "flush privileges"
        #重启服务
        systemctl restart mysqld.service
    fi
}

#用expect配置全自动ssh免密
expect_ssh(){
    yum install -y expect
    echo "start!"
    myhost=$1
    expect /opt/myshell/nologin.sh $myhost
    echo "end!"
}

#安装hadoop
setup_hadoop(){
    check_soft_folder hadoop260
    if [ $? == 1 ];then
        #在opt文件夹下搜索hadoop的tar.gz文件
        hadoopName=`ls /opt/ | grep hadoop`
        #将文件解压到对应的soft文件夹下
        tar -zxf /opt/$hadoopName -C /opt/soft/hadoop260 --strip-components 1
        #配置/etc/profile文件
        echo "" >> /etc/profile
        echo "#hadoop environment" >> /etc/profile
        echo "export HADOOP_HOME=/opt/soft/hadoop260" >> /etc/profile
        echo "export HADOOP_MAPRED_HOME=$HADOOP_HOME" >> /etc/profile
        echo "export HADOOP_COMMON_HOME=$HADOOP_HOME" >> /etc/profile
        echo "export HADOOP_HDFS_HOME=$HADOOP_HOME" >> /etc/profile
        echo "export YARN_HOME=$HADOOP_HOME" >> /etc/profile
        echo "export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native" >> /etc/profile
        echo "export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin" >> /etc/profile
        echo "export HADOOP_INSTALL=$HADOOP_HOME" >> /etc/profile
        source /etc/profile
    fi
}

#配置4个site.xml和一个evn.sh
setup_xml(){
    cd /opt/soft/hadoop260/etc/hadoop/
    sed -i "//a nfs.defaultFSnhdfs://:9000nnnhadoop.tmp.dirn/opt/soft/hadoop260/tmpnnnhadoop.proxyuser.root.groupsn*nnnhadoop.proxyuser.root.hostsn*nnnhadoop.proxyuser.root.usersn*n" core-site.xml
    sed -i 's/$1//opt/soft/jdk180/g' core-site.xml
    sed -i '//a ndfs.replicationn1n' hdfs-site.xml
    sed -i '//a nyarn.resourcemanager.localhostnlocalhostnnnyarn.nodemanager.aux-servicesnmapreduce_shufflen' yarn-site.xml
    mv mapred-site.xml.template mapred-site.xml
    sed -i '//a nmapreduce.framework.namenyarnn' mapred-site.xml
    sed -i 's/${JAVA_HOME}//opt/soft/jdk180/g' hadoop-env.sh
    hadoop namenode -format
    /usr/bin/expect << EOF
spawn hadoop namenode -format
expect {
"(Y or N)" {send "Yr";exp_continue}
}
spawn start-all.sh
expect {
"(yes/no)?" {send "yesr";exp_continue}
"(yes/no)?" {send "yesr";exp_continue}
}
EOF
    
    jps
}

#安装hive
setup_hive(){
    check_soft_folder hive110
    if [ $? == 1 ];then
        #在opt文件夹下搜索hive的tar.gz文件
        hiveName=`ls /opt/ | grep hive`
        #将文件解压到对应的soft文件夹下
        tar -zxf /opt/$hiveName -C /opt/soft/hive110 --strip-components 1
        #配置/etc/profile文件
        echo "" >> /etc/profile
        echo "#hive environment" >> /etc/profile
        echo "export HIVE_HOME=/opt/soft/hive110" >> /etc/profile
        echo "export PATH=$PATH:$HIVE_HOME/bin" >> /etc/profile
        source /etc/profile
    fi
}

hive_xml_init(){
    cd /opt/soft/hive110/conf/
    touch hive-site.xml
    echo -e "nnnnhive.metastore.warehouse.dirn/hive110/warehousennnhive.metastore.localntruennnjavax.jdo.option.ConnectionURLnjdbc:mysql://127.0.0.1:3306/hive?createDatabaseIfNotExist=truennnjavax.jdo.option.ConnectionDriverNamencom.mysql.jdbc.Drivernnnjavax.jdo.option.ConnectionUserNamenrootnnnjavax.jdo.option.ConnectionPasswordnokoknn" >> hive-site.xml
    cp /opt/jar/mysql-connector-java-5.1.35.jar /opt/soft/hive110/lib
    schematool -dbType mysql -initSchema
}

#安装zeppelin
setup_zeppelin(){
    check_soft_folder zeppelin081
    if [ $? == 1 ];then
        #在opt文件夹下搜索zeppelin的tgz文件
        zeppelinName=`ls /opt/ | grep zeppelin`
        #将文件解压到对应的soft文件夹下
        tar -zxf /opt/$zeppelinName -C /opt/soft/zeppelin081 --strip-components 1
        #配置/etc/profile文件
        echo "" >> /etc/profile
        echo "#zeppelin environment" >> /etc/profile
        echo "export ZEPPELIN_HOME=/opt/soft/zeppelin081" >> /etc/profile
        echo "export PATH=$PATH:$ZEPPELIN_HOME/bin" >> /etc/profile
        source /etc/profile
        nohup hive --service hiveserver2 &
        jps
    fi
}
zeppelin_xml_init(){
    cd /opt/soft/zeppelin081/conf/
    cp zeppelin-site.xml.template zeppelin-site.xml
    sed -i '//a nzeppelin.helium.registrynheliumn' zeppelin-site.xml
    cp zeppelin-env.sh.template zeppelin-env.sh
    sed -i 's/#export JAVA_HOME=/export JAVA_HOME=/opt/soft/jdk180/' zeppelin-env.sh
    sed -i 's/#export HADOOP_CONF_DIR=/export HADOOP_CONF_DIR=/opt/soft/hadoop260/etc/hadoop/' zeppelin-env.sh
    zeppelin-daemon.sh start
    cp /opt/soft/hive110/conf/hive-site.xml /opt/soft/zeppelin081/conf/
    cp /opt/soft/hadoop260/share/hadoop/common/hadoop-common-2.6.0-cdh5.14.2.jar  /opt/soft/zeppelin081/interpreter/jdbc/
    cp /opt/soft/hive110/lib/hive-jdbc-1.1.0-cdh5.14.2-standalone.jar /opt/soft/zeppelin081/interpreter/jdbc/
    jps
}


#根据用户的选择进行对应的安装
custom_option(){
    case $1 in
        "1")
            modify_sysname $2 $3
            ;;
        "2")
            modify_staticip $3
            ;;
        "3")
            close_firewalld
            ;;
        "4")
            modify_yumsource
            ;;
        "5")
            setup_jdk
            ;;
        "6")
            setup_mysql
            ;;
        "7")
            expect_ssh $2
            ;;
        "8")
            setup_hadoop
            ;;
        "9")
            setup_xml $3
            ;;
        "10")
            setup_hive
            ;;
        "11")
            hive_xml_init
            ;;
        "12")
            setup_zeppelin
            ;;
        "13")
            zeppelin_xml_init
            ;;
        "all")
            modify_sysname $2 $3
            modify_staticip $3
            close_firewalld
            modify_yumsource
            setup_jdk
            setup_mysql
            expect_ssh $2
            setup_hadoop
            setup_xml $3
            setup_hive
            hive_xml_init
            setup_zeppelin
            zeppelin_xml_init
            ;;
        *)
        echo "please input 1~13 or all"
        ;;
    esac
}

#规定$1用户传入必须是IP地址 $2用户传入必须是系统的名称 $3用户安装软件选择
custom_option $3 $2 $1

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5686981.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存