课程介绍与笔记
1.HDFS shell
1.0查看帮助
hadoop fs -help
1.1上传
hadoop fs -put <linux上文件><hdfs上的路径>
1.2查看文件内容
hadoop fs -cat <hdfs上的路径>
1.3查看文件列表
hadoop fs -ls /
1.4下载文件
hadoop fs -get <hdfs上的路径><linux上文件>
2.使用java接口操作HDFS
见eclipse工程下的demo
3.hadoop通信机制
不同进程之间的方法进行调用
4.HDFS源码分析
FileSystem.get --> 通过反射实例化了一个DistributedFileSystem --> new DFSCilent()把他作为自己的成员变量
在DFSClient构造方法里面,调用了createNamenode,使用了RPC机制,得到了一个NameNode的代理对象,就可以和NameNode进行通信了
FileSystem --> DistributedFileSystem --> DFSClient --> NameNode的代理
1.执行MR的命令:
hadoop jar <jar在linux的路径><main方法所在的类的全类名><参数>
例子:
hadoop jar /root/wc1.jar cn.itcast.d3.hadoop.mr.WordCount hdfs://itcast:9000/words /out2
2.MR执行流程
(1).客户端提交一个mr的jar包给JobClient(提交方式:hadoop jar ...)
(2).JobClient通过RPC和JobTracker进行通信,返回一个存放jar包的地址(HDFS)和jobId
(3).client将jar包写入到HDFS当中(path = hdfs上的地址 + jobId)
(4).开始提交任务(任务的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
(5).JobTracker进行初始化任务
(6).读取HDFS上的要处理的文件,开始计算输入分片,每一个分片对应一个MapperTask
(7).TaskTracker通过心跳机制领取任务(任务的描述信息)
(8).下载所需的jar,配置文件等
(9).TaskTracker启动一个java child子进程,用来执行具体的任务(MapperTask或ReducerTask)
(10).将结果写入到HDFS当中
1.实现分区的步骤:
1.1先分析一下具体的业务逻辑,确定大概有多少个分区
1.2首先书写一个类,它要继承org.apache.hadoop.mapreduce.Partitioner这个类
1.3重写public int getPartition这个方法,根据具体逻辑,读数据库或者配置返回相同的数字
1.4在main方法中设置Partioner的类,job.setPartitionerClass(DataPartitioner.class);
1.5设置Reducer的数量,job.setNumReduceTasks(6);
2.排序MR默认是按key2进行排序的,如果想自定义排序规则,被排序的对象要实现WritableComparable接口,在compareTo方法中实现排序规则,然后将这个对象当做k2,即可完成排序
3.combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。
4.MR启动流程
start-mapred.sh --> hadoop-daemon.sh --> hadoop --> org.apache.hadoop.mapred.JobTracker
Jobtracker调用顺序:main --> startTracker --> new JobTracker 在其构造方法中首先创建一个调度器,接着创建一个RPC的server(interTrackerServer)tasktracker会通过PRC机制与其通信
然后调用offerService方法对外提供服务,在offerService方法中启动RPC server,初始化jobtracker,调用taskScheduler的start方法 --> eagerTaskInitializationListener调用start方法,
--> 调用jobInitManagerThread的start方法,因为其是一个线程,会调用JobInitManager的run方法 --> jobInitQueue任务队列去取第一个任务,然后把它丢入线程池中,然后调用-->InitJob的run方法
--> jobTracker的initJob方法 --> JobInProgress的initTasks --> maps = new TaskInProgress[numMapTasks]和reduces = new TaskInProgress[numReduceTasks];
TaskTracker调用顺序:main --> new TaskTracker在其构造方法中调用了initialize方法,在initialize方法中调用RPC.waitForProxy得到一个jobtracker的代理对象
接着TaskTracker调用了本身的run方法,--> offerService方法 --> transmitHeartBeat返回值是(HeartbeatResponse)是jobTracker的指令,在transmitHeartBeat方法中InterTrackerProtocol调用了heartbeat将tasktracker的状态通过RPC机制发送给jobTracker,返回值就是JobTracker的指令
heartbeatResponse.getActions()得到具体的指令,然后判断指令的具体类型,开始执行任务
addToTaskQueue启动类型的指令加入到队列当中,TaskLauncher又把任务加入到任务队列当中,--> TaskLauncher的run方法 --> startNewTask方法 --> localizeJob下载资源 --> launchTaskForJob开始加载任务 --> launchTask --> runner.start()启动线程; --> TaskRunner调用run方法 --> launchJvmAndWait启动java child进程
1.上传zk安装包
2.解压
3.配置(先在一台节点上配置)
3.1添加一个zoo.cfg配置文件
$ZOOKEEPER/conf
mv zoo_sample.cfg zoo.cfg
3.2修改配置文件(zoo.cfg)
dataDir=/itcast/zookeeper-3.4.5/data
server.1=itcast05:2888:3888
server.2=itcast06:2888:3888
server.3=itcast07:2888:3888
3.3在(dataDir=/itcast/zookeeper-3.4.5/data)创建一个myid文件,里面内容是server.N中的N(server.2里面内容为2)
echo "1" > myid
3.4将配置好的zk拷贝到其他节点
scp -r /itcast/zookeeper-3.4.5/ itcast06:/itcast/
scp -r /itcast/zookeeper-3.4.5/ itcast07:/itcast/
3.5注意:在其他节点上一定要修改myid的内容
在itcast06应该讲myid的内容改为2 (echo "6" > myid)
在itcast07应该讲myid的内容改为3 (echo "7" > myid)
4.启动集群
分别启动zk
./zkServer.sh start
2015年传智播客大数据hadoop教程8天
欢迎光临 吾爱编程 (http://www.52pg.net/) | Powered by Discuz! X3.2 |