【Spark】读取Linux得数据并将结果存储在HDFS上

【Spark】读取Linux得数据并将结果存储在HDFS上,第1张

【Spark】读取Linux得数据并将结果存储在HDFS上

读取Linux得数据并将结果存储在HDFS上
  • 准备工作
  • 打包运行
  • 查看结果

准备工作
  • 在Linux上创建以下数据
  • 确保hadoop和spark是开启的
  • 编写代码,也就是简单的wordcount
import org.apache.spark.sql.SparkSession

object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[6]")
      .appName("Wc")
      .getOrCreate()
    val sourceRDD = spark.sparkContext.textFile("file:///home/data/wc")
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
    sourceRDD.saveAsTextFile("hdfs://192.168.23.37:9000/wc")
  }
}
打包运行


选择运行主类

为了避免jar太大,可以将多余的jar包删除,只保留xxx complie output这一个,其他的jar包都是为了确保在window环境下运行,在Linux中不需要

记住这个地址,运行得时候需要使用

开始打包

将jar包传输到集群上运行,记住在spark/bin下运行,输入以下命令
./spark-submit --class WC.WordCount /home/data/Spark_trip.jar
第一个就是在打包时看到的地址
第二个时jar包文件地址

查看结果

运行日志如下

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
22/01/07 17:07:47 INFO SparkContext: Running Spark version 2.0.0
22/01/07 17:07:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/01/07 17:07:47 INFO SecurityManager: Changing view acls to: root
22/01/07 17:07:47 INFO SecurityManager: Changing modify acls to: root
22/01/07 17:07:47 INFO SecurityManager: Changing view acls groups to: 
22/01/07 17:07:47 INFO SecurityManager: Changing modify acls groups to: 
22/01/07 17:07:47 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
22/01/07 17:07:48 INFO Utils: Successfully started service 'sparkDriver' on port 36931.
22/01/07 17:07:48 INFO SparkEnv: Registering MapOutputTracker
22/01/07 17:07:48 INFO SparkEnv: Registering BlockManagerMaster
22/01/07 17:07:48 INFO DiskBlockManager: Created local directory at /usr/local/src/spark/blockmgr-e2b440f5-1fd3-442f-b337-e5f763232f3b
22/01/07 17:07:48 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
22/01/07 17:07:48 INFO SparkEnv: Registering OutputCommitCoordinator
22/01/07 17:07:48 INFO Utils: Successfully started service 'SparkUI' on port 4040.
22/01/07 17:07:48 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.3.60:4040
22/01/07 17:07:48 INFO SparkContext: Added JAR file:/home/data/Spark_trip.jar at spark://192.168.3.60:36931/jars/Spark_trip.jar with timestamp 1641546468489
22/01/07 17:07:48 INFO Executor: Starting executor ID driver on host localhost
22/01/07 17:07:48 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 34392.
22/01/07 17:07:48 INFO NettyBlockTransferService: Server created on 192.168.3.60:34392
22/01/07 17:07:48 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.3.60, 34392)
22/01/07 17:07:48 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.3.60:34392 with 366.3 MB RAM, BlockManagerId(driver, 192.168.3.60, 34392)
22/01/07 17:07:48 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.3.60, 34392)
22/01/07 17:07:48 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
22/01/07 17:07:48 INFO SharedState: Warehouse path is 'file:/usr/local/src/spark/bin/spark-warehouse'.
22/01/07 17:07:49 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 128.0 KB, free 366.2 MB)
22/01/07 17:07:49 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 14.6 KB, free 366.2 MB)
22/01/07 17:07:49 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.3.60:34392 (size: 14.6 KB, free: 366.3 MB)
22/01/07 17:07:49 INFO SparkContext: Created broadcast 0 from textFile at WordCount.scala:11
22/01/07 17:07:49 INFO FileInputFormat: Total input paths to process : 1
22/01/07 17:07:59 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
22/01/07 17:07:59 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
22/01/07 17:07:59 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
22/01/07 17:07:59 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
22/01/07 17:07:59 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
22/01/07 17:07:59 INFO SparkContext: Starting job: saveAsTextFile at WordCount.scala:15
22/01/07 17:07:59 INFO DAGScheduler: Registering RDD 3 (map at WordCount.scala:13)
22/01/07 17:07:59 INFO DAGScheduler: Got job 0 (saveAsTextFile at WordCount.scala:15) with 2 output partitions
22/01/07 17:07:59 INFO DAGScheduler: Final stage: ResultStage 1 (saveAsTextFile at WordCount.scala:15)
22/01/07 17:07:59 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
22/01/07 17:07:59 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
22/01/07 17:07:59 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCount.scala:13), which has no missing parents
22/01/07 17:07:59 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.5 KB, free 366.2 MB)
22/01/07 17:07:59 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.7 KB, free 366.2 MB)
22/01/07 17:07:59 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.3.60:34392 (size: 2.7 KB, free: 366.3 MB)
22/01/07 17:07:59 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1012
22/01/07 17:07:59 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCount.scala:13)
22/01/07 17:07:59 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
22/01/07 17:08:00 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0, PROCESS_LOCAL, 5408 bytes)
22/01/07 17:08:00 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1, PROCESS_LOCAL, 5408 bytes)
22/01/07 17:08:00 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
22/01/07 17:08:00 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
22/01/07 17:08:00 INFO Executor: Fetching spark://192.168.3.60:36931/jars/Spark_trip.jar with timestamp 1641546468489
22/01/07 17:08:00 INFO TransportClientFactory: Successfully created connection to /192.168.3.60:36931 after 24 ms (0 ms spent in bootstraps)
22/01/07 17:08:00 INFO Utils: Fetching spark://192.168.3.60:36931/jars/Spark_trip.jar to /usr/local/src/spark/spark-d7d40a9c-05f7-4c93-95ba-e679fe12e427/userFiles-99d05970-cc3a-4d8e-87f6-39517c091b8d/fetchFileTemp3138355623141125753.tmp
22/01/07 17:08:00 INFO Executor: Adding file:/usr/local/src/spark/spark-d7d40a9c-05f7-4c93-95ba-e679fe12e427/userFiles-99d05970-cc3a-4d8e-87f6-39517c091b8d/Spark_trip.jar to class loader
22/01/07 17:08:00 INFO HadoopRDD: Input split: file:/home/data/wc:19+19
22/01/07 17:08:00 INFO HadoopRDD: Input split: file:/home/data/wc:0+19
22/01/07 17:08:00 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1575 bytes result sent to driver
22/01/07 17:08:00 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1575 bytes result sent to driver
22/01/07 17:08:00 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 242 ms on localhost (1/2)
22/01/07 17:08:00 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 270 ms on localhost (2/2)
22/01/07 17:08:00 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
22/01/07 17:08:00 INFO DAGScheduler: ShuffleMapStage 0 (map at WordCount.scala:13) finished in 0.299 s
22/01/07 17:08:00 INFO DAGScheduler: looking for newly runnable stages
22/01/07 17:08:00 INFO DAGScheduler: running: Set()
22/01/07 17:08:00 INFO DAGScheduler: waiting: Set(ResultStage 1)
22/01/07 17:08:00 INFO DAGScheduler: failed: Set()
22/01/07 17:08:00 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at WordCount.scala:15), which has no missing parents
22/01/07 17:08:00 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 65.8 KB, free 366.1 MB)
22/01/07 17:08:00 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 23.7 KB, free 366.1 MB)
22/01/07 17:08:00 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.3.60:34392 (size: 23.7 KB, free: 366.3 MB)
22/01/07 17:08:00 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1012
22/01/07 17:08:00 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at WordCount.scala:15)
22/01/07 17:08:00 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
22/01/07 17:08:00 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, partition 0, ANY, 5203 bytes)
22/01/07 17:08:00 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, partition 1, ANY, 5203 bytes)
22/01/07 17:08:00 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
22/01/07 17:08:00 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
22/01/07 17:08:00 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
22/01/07 17:08:00 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms
22/01/07 17:08:00 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 2 blocks
22/01/07 17:08:00 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms
22/01/07 17:08:00 INFO FileOutputCommitter: Saved output of task 'attempt_202201071707_0001_m_000001_3' to hdfs://192.168.23.37:9000/wc/_temporary/0/task_202201071707_0001_m_000001
22/01/07 17:08:00 INFO SparkHadoopMapRedUtil: attempt_202201071707_0001_m_000001_3: Committed
22/01/07 17:08:00 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1808 bytes result sent to driver
22/01/07 17:08:00 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 327 ms on localhost (1/2)
22/01/07 17:08:01 INFO FileOutputCommitter: Saved output of task 'attempt_202201071707_0001_m_000000_2' to hdfs://192.168.23.37:9000/wc/_temporary/0/task_202201071707_0001_m_000000
22/01/07 17:08:01 INFO SparkHadoopMapRedUtil: attempt_202201071707_0001_m_000000_2: Committed
22/01/07 17:08:01 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1721 bytes result sent to driver
22/01/07 17:08:01 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 725 ms on localhost (2/2)
22/01/07 17:08:01 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
22/01/07 17:08:01 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at WordCount.scala:15) finished in 0.727 s
22/01/07 17:08:01 INFO DAGScheduler: Job 0 finished: saveAsTextFile at WordCount.scala:15, took 1.261376 s
22/01/07 17:08:01 INFO SparkContext: Invoking stop() from shutdown hook
22/01/07 17:08:01 INFO SparkUI: Stopped Spark web UI at http://192.168.3.60:4040
22/01/07 17:08:01 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/01/07 17:08:01 INFO MemoryStore: MemoryStore cleared
22/01/07 17:08:01 INFO BlockManager: BlockManager stopped
22/01/07 17:08:01 INFO BlockManagerMaster: BlockManagerMaster stopped
22/01/07 17:08:01 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/01/07 17:08:01 INFO SparkContext: Successfully stopped SparkContext
22/01/07 17:08:01 INFO ShutdownHookManager: Shutdown hook called
22/01/07 17:08:01 INFO ShutdownHookManager: Deleting directory /usr/local/src/spark/spark-d7d40a9c-05f7-4c93-95ba-e679fe12e427

这时可以在HDFS上查看文件

我们使用wc -l命令查看文件得行数

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/zaji/5698632.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存