In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article introduces the relevant knowledge of "what are the characteristics of Hadoop". In the operation of actual cases, many people will encounter such a dilemma. Then let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
1 introduction to Hadoop
1.1 Origin of Hadoop
Data capacity
In the era of big data, the amount of data is huge, and the data has the following characteristics:
Volume (mass)
Velocity (high speed)
Variety (diverse)
Value (low value density)
The previous storage methods and analysis methods now do not work! Hadoop is used to solve the problems of storage and analysis and calculation of massive data. When founder Doug Cutting created Hadoop, the main source of thought was the three carriages of Google.
The first GFS produced the HDFS.
The second MapReduce produced the MR.
The third BigTable produced the HBase.
Hadoop now generally refers to the broad concept of Hadoop biosphere, as follows:
Big data's knowledge system
1.2 Hadoop featur
1.2.1 characteristics of Hadoop
High availability
The underlying layer of Hadoop maintains multiple copies of the same data, even if there is a problem with a computing element or storage in Hadoop, it will not cause data loss.
High expansion
By assigning task data among clusters, you can easily expand and delete multiple nodes. For example, Meituan nodes are located in 3K~5k nodes.
High efficiency
Under the idea of MapReduce, Hadoop works in parallel to speed up the processing of tasks.
High fault tolerance
If a subtask is too slow or the task fails, Hadoop will have a response policy and automatically retry and assign the task.
1.2.2 Hadoop architecture design
Hadoop 1.x is quite different from 2.x. 2.x mainly decouples the resource scheduling tasks in 1.x MapReduce to be managed by Yarn.
1.x and 2.x change
HDFS
Hadoop Distributed File System, abbreviated as HDFS, is a distributed file system. HDFS has high fault tolerance and is designed to be deployed on low-cost hardware to provide high-throughput access to application data. It is suitable for applications with large data sets.
MapReduce
MapReduce is a programming model that includes Map (mapping) and Reduce (reduction). You can think of it as the deep thought of merging and sorting.
Yarn
Apache Hadoop YARN (Yet Another Resource Negotiator, another resource coordinator) is a new Hadoop resource manager, which is a general resource management system, which can provide unified resource management and scheduling for upper-level applications. Its introduction has brought great benefits to the cluster in terms of utilization, unified resource management and data sharing.
Common component
Log component.
Unique RPC system ipc, Istroke O system, serialization, compression.
Configuration file conf.
Public method classes, such as checkSum parity.
2 HDFS
Background:
With the increase of the amount of data, the data can not be stored in one OS disk, so it is necessary to distribute the data to multiple OS managed disks. In order to manage disk files under multiple OS, there is an urgent need for a system to manage files on multiple machines, which is a distributed file management system. HDFS locates files through a directory tree. Note that HDFS is only one of the distributed file systems.
2.1 advantages and disadvantages of HDFS
2.1.1 benefits
High fault tolerance
Multiple copies of the data are automatically saved, default to 3, and fault tolerance is improved by adding copies.
The system automatically recovers when a copy is lost.
High scalability
The size of HDFS clusters can be dynamically scaled.
Suitable for big data to deal with
The scale of the data reaches the GB/TB/PB level.
The size of the document is more than one million.
Streaming access, which can ensure the consistency of data.
Low-cost, low-cost deployment of cheap machines improves commercialization.
Unified external interface, Hadoop itself is written in Java, but applications based on this can be written in other languages.
2.1.1 shortcomings
Unable to achieve low latency
Hadoop optimizes high throughput at the expense of the delay in obtaining data, such as the inability to obtain data in milliseconds on Hadoop.
Not suitable for storing a large number of small files
If you store a large number of small files, it will take up a lot of memory in NameNode to store files, directories, and block information. Therefore, the total number of files that the file system can store is limited by the memory capacity of the NameNode, and as a rule of thumb, the storage information for each file, directory, and data block is about 150 bytes.
The seek time of the small file storage exceeds the read time, which violates the design goal of HDFS.
Unable to modify file
For files uploaded to HDFS, modification of files is not supported, only appends are supported. HDFS is suitable for scenarios where you write once and read multiple times.
Cannot write concurrently
HDFS does not support multiple users performing write operations at the same time, that is, only one user can perform write operations at a time.
2.2 HDFS component architecture
2.2.1 Client
The client mainly has the following functions:
File segmentation, when the file is uploaded to HDFS, Client will split the file into a Block, and then store it.
Interact with NameNode to get the location information of the file.
Interact with DataNode, read or write data.
Client provides commands to manage HDFS, such as starting or shutting down HDFS.
Client can access HDFS through some commands.
2.2.2 NameNode
NameNode is referred to as NN, which is the Master in HDFS. It is a manager and has the following functions:
Manage the namespaces of the HDFS.
Configure replica Policy
Handle client read and write requests.
Manage block (Block) mapping information.
Mapping information: NameNode (file path, number of copies, {Block1,Block2}, [Block1: [three replica paths], Block2: [three replica paths]])
2.2.3 DataNode
DataNode abbreviated as DN means that the Slave,NameNode in the HDFS cluster is responsible for issuing commands, and the DataNode performs the actual operation.
Stores the actual blocks of data.
Perform read / write operations on the data block.
As mentioned above, the data directory information is stored in NN, while the specific information is stored in DN. The vivid analogy is as follows.
Comparison between NN and DN
The working Mechanism of DataNode
Data blocks are stored on disk information including data + data length + checksum + timestamp.
After DataNode starts, it registers with NameNode and reports all block information to NameNode periodically (1 hour).
The heartbeat between NN and DN beats every 3 seconds, and the heartbeat returns the result with the command given by NameNode to the DataNode, such as copying block data to another machine, or deleting a block. If you do not receive a heartbeat from a DataNode for more than 10 minutes, the node is considered unavailable.
Some machines can be safely added and exited while the cluster is running.
DataNode ensures data integrity
When DataNode reads Block, it calculates CheckSum.
If the calculated CheckSum is different from the value when the Block was created, the Block has been corrupted.
Client reads Block on other DataNode.
DataNode periodically validates CheckSum after its file is created
After the DN process dies or is unable to communicate with the NN, NN will not immediately sentence DN to death, usually after 10 minutes + 30 seconds.
2.2.4 Secondary NameNode
When NameNode dies, it does not immediately replace NameNode and provide services. Automatic switching needs to be realized by means of HA and so on. SNN mainly provides the following functions:
Assist NameNode and share its workload.
Merge Fsimage and Edits periodically and push them to NameNode.
In case of emergency, the recovery of NameNode can be assisted.
2.2.5 Block
The files in HDFS are physically stored in chunked Block, and in version 1.x, block = 64m _ 2.x, block = 128m. The bigger the block, the better, nor the smaller the better. Because the user obtains the data information time = the addressing block time + the disk transfer time.
If the block is too small, it will increase the addressing time, and most of the program time is spent on addressing.
If the speed is too large, the disk transfer time will be significantly longer than the addressing time, and the program will be slow to process block data.
2.3 HDFS Writing process
2.3.1 specific writing process
Writing process
The client requests the NameNode to upload the file through the Distributed FileSystem module, and the NameNode checks whether the target file already exists and the parent directory exists.
NameNode returns whether it can be uploaded.
The client requests which DataNode servers the first Block is uploaded to.
NameNode returns three DataNode nodes, namely dn1, dn2, and dn3.
The client requests dn1 to upload data through the FSDataOutputStream module, and dn1 will continue to call dn2 when it receives the request, and then dn2 will call dn3 to complete the communication pipeline.
Dn1, dn2, dn3 reply the client step by step.
The client starts uploading the first Block to dn1 (first reading data from disk to a local memory cache). In units of Packet, dn1 receives a Packet and passes it to dn2,dn2 and passes it to dn3;dn1. Each packet is put into a response queue to wait for a reply.
When a Block transfer is complete, the client again requests NameNode to upload the server of the second Block. (repeat steps 3-7).
2.3.2 Node distance calculation
In the process of writing data by HDFS, NameNode will select the DataNode that is closest to the data to be uploaded to receive the data.
Nearest distance = the sum of the distances of the two nodes to the nearest common ancestor.
Node distance calculation
Processes on the same node as Distance (/ d1qqr1pxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
Distance (/ d1qxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
Distance (/ d1pact r2pm n0marqqd1ax r3Unique N2) = 4 different rack nodes in the same data center
Distance (/ d1qpxr2qn1mag pxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx) = 6 different data centers
2.3.3 replica node selection
The first copy is on the node where the Client resides. If you are outside the cluster, choose one at random.
The second copy is on a different node from the first copy on the same rack.
The third part is located in different racks, random nodes.
Rack awareness
2.4 HDFS read proc
Reading process
The client requests NameNode to download the file through Distributed FileSystem, and NameNode finds the DataNode address where the file block is located by querying metadata.
Pick a DataNode (nearest principle, then random) server and request to read the data.
DataNode begins to transfer data to the client (reading the data input stream from disk and verifying it in Packet units).
The client receives it in Packet units, caches it locally, and then writes to the target file.
2.5 NameNode and Secondary NameNode
2.5.1 how NN and 2NN work
It is not convenient to read and write metadata in NameNode when it is saved to disk separately. When stored separately in memory, the power outage will be lost. Hadoop uses the following methods.
FsImage:
Where the metadata is serialized and stored on disk. Contains inode serialization information for all directories and files of the HDFS file system.
Memory:
A place where metadata is stored in memory.
Edit file:
Edit records every step of the client updating metadata information (metadata can be calculated by Edits).
The metadata is updated and added, and the metadata modification is appended to Edits and then the metadata in memory is modified, so that once the NameNode node is powered off, the metadata is generated by the combination of FsImage and Edits.
The Edits file should not be too large. The system will merge FsImage and Edits regularly by Secondary Namenode.
Working mechanism of NN and 2NN
Phase I: NameNode startup
After starting NameNode formatting for the first time, create Fsimage and Edits files. If it is not the first time to start, load the editing log and image file directly into memory.
Client requests for additions, deletions and modifications to metadata.
NameNode records the operation log and updates the scrolling log.
NameNode adds, deletes and modifies the data in memory.
The second phase: Secondary NameNode work
Secondary NameNode asks if NameNode needs CheckPoint. Bring back NameNode directly to see if the result is checked. Generally speaking, the following conditions can be met arbitrarily:
CheckPoint is executed once an hour by default.
Check the number of Edits file operations once a minute, reaching the threshold CheckPoint.
Secondary NameNode requests execution of CheckPoint.
NameNode scrolls the Edits log that is being written.
Copy the edit log edit _ 001 and the mirror file FsImage before scrolling to Secondary NameNode.
Secondary NameNode loads edit logs and image files into memory and merges them.
Generate a new image file FsImage.chkpoint.
Copy FsImage.chkpoint to NameNode.
NameNode renamed FsImage.chkpoint to FsImage.
2.6 Security Mode
When NameNode starts up, the system enters safe mode (read-only). If 99.9% of the blocks in the entire file system meet the minimum copy, NameNode will exit safe mode after 30 seconds.
2.6.1 NameNode launch
Load the FsImage file into memory and then perform various operations of the Edits file, and finally generate a complete metadata image in memory.
Create a new FsImage with an empty Edits file.
NameNode starts listening to DataNode.
The entire process NameNode has been running in safe mode, and NameNode is read-only to Client.
2.6.2 DataNode launch
System block locations are not maintained by NameNode, but are stored in DataNode as a list of blocks.
In safe mode, DataNode sends the latest block list information to NameNode, which promotes NameNode to run efficiently.
All block location mapping information is retained in NameNode memory during normal operation.
2.7 HDFS-HA
There is a single point of failure (SPOF) in NameNode in HDFS cluster. In order to implement High Available, it actually includes HDFS-HA and YARN-HA. HDFS can solve the above problems by configuring two NameNodes of Active/Standby to achieve hot backup of NameNode in the cluster. If there is a failure, such as a machine crash or a machine that needs to be upgraded and maintained, NameNode can be quickly switched to another machine. The realization of HA function mainly depends on ZooKeeper and ZKFC processes.
HA failover
2.7.1 key points of HDFS-HA work
The management of metadata needs to be changed.
Each holds a copy of the metadata in memory.
Edits logs can only be written to NameNode nodes in Active state.
Both NameNode can read Edits.
Shared Edits is managed in a shared storage (qjournal or NFS).
A state management function module is required
A ZKFC is implemented, which resides in the node where each namenode is located, and each ZKFC is responsible for monitoring its own NameNode node and using zk for status identification. When state switching is needed, ZKFC is responsible for switching, and it is necessary to prevent the occurrence of brain split phenomenon.
Must ensure that ssh can log in without a password between two NameNode
To prevent brain fissure, there is only one NameNode providing services at the same time.
2.7.2 ZooKeeper
ZooKeeper provides the following features:
Fault detection: each NameNode in the cluster maintains a persistent session in the ZooKeeper. If the machine crashes, the session in the ZooKeeper will be terminated, and the ZooKeeper notifies another NameNode that a failover is required.
Active NameNode selection: ZooKeeper provides a simple mechanism for uniquely selecting a node in the active state. If the current active NameNode crashes, another node may obtain a special exclusive lock from the ZooKeeper to indicate that it should become an active NameNode.
2.7.3 ZKFC process
There is a ZK client such as ZKFC (ZKFailoverController) on the NameNode host, which is responsible for monitoring and managing NameNode status. ZKFC is responsible for:
Health monitoring: ZKFC periodically detects collisions with NameNode monitoring on the same host.
ZooKeeper session management: when NameNode is healthy, ZKFC keeps the session open with ZK cluster, and ZKFC also holds a znode lock. If the session terminates, the lock node will be deleted automatically.
ZooKeeper-based selection: when ZKFC discovers that the local NameNode is healthy, it will try to acquire the znode lock, and if it is successful, the Active status will be obtained.
3 MapReduce
MapReduce is a programming framework for distributed computing programs and a core framework for data analysis and calculation based on Hadoop. The process is divided into two stages: the Map phase and the Reduce phase.
Map is responsible for breaking down a task into multiple tasks. The concurrent instances of MapTask at this stage run completely in parallel and are irrelevant to each other.
Reduce is responsible for summarizing the results of multiple tasks. The ReduceTask concurrent instances at this stage are unrelated to each other, but their data depends on the output of all MapTask concurrent instances from the previous phase.
The MapReduce programming model can only contain one Map phase and one Reduce phase. If the user's business logic is very complex, then only multiple MapReduce programs can be run serially.
The user writing MR task time program implementation part is divided into three parts: Mapper, Reducer, Driver (submit the client running mr program).
3.1 advantages and disadvantages
3.1.1 benefits
Easy to program
Simply implement some interfaces to complete a distributed program, you write a distributed program is the same as writing a serial program, similar to eight-part programming.
Good expansion
When computing resources are insufficient, you can simply increase the number of machines to expand computing power.
High fault tolerance
After the MapReduce task is deployed on multiple machines, if one of them dies, the system will transfer the task automatically to ensure that the task is executed correctly.
Suitable for PB-level data offline processing
For example, Meituan's cluster concurrency of 3K nodes provides super-large data processing capacity.
3.1.2 shortcomings
Not good at real-time computing
MapReduce does not return the result in milliseconds like MySQL.
Not good at streaming computing
The input data of streaming computing is dynamic, while the input data set of MapReduce is static.
Not good at DAG computing
Multiple applications have dependencies. The results of MapReduce jobs will lead to a large number of disk IO, resulting in low performance. Go to Spark at this time.
3.2 Serialization
Serialization
Convert objects in memory into byte sequences (or other data transfer protocols) for storage (persistence) and network transmission.
Deserialization
The received byte sequence (or other data transfer protocol) or persistent data from the hard disk is converted into an object in memory.
Because Hadoop needs serialization when communicating between clusters or RPC calls, and it requires serialization to be fast, small in size, and small in bandwidth. The serialization that comes with Java is a heavyweight framework, and object serialization will be accompanied with additional information, such as various verification information, header, inheritance system, and so on. So Hadoop developed its own serialization framework.
Java type Hadoop Writable type booleanBooleanWritablebyteByteWritableintIntWritablefloatFloatWritablelongLongWritabledoubleDoubleWritableStringTextmapMapWritablearrayArrayWritable
3.3 MapTask parallelism
Data blocks: Block is a HDFS that physically divides data into chunks.
Data slicing: data slicing only logically slices the input and does not slice it on disk for storage.
Slicing core notes:
The parallelism of the Map phase of a Job and the number of slices when the client submits the Job
Each Split slice allocates one MapTask parallel instance processing
Slice size in the case of the model = BlockSize
Slicing does not take into account the overall size of the dataset, but is sliced individually for each file.
3.3.1 FileInputFormat slice source code tracking
Source tracking of FileInputFormat slices
The program first finds the target data storage directory
Start traversing each file in the directory. Each file will do the following
Get slice size. By default, slice size = blocksize
Start slicing, each slice to determine whether the rest of the block is greater than 1.1 times, not greater than it is divided into a slice.
The slice information is written to the slice planning file.
The core slicing process is completed in the getSplit method.
InputSplit only records the slice metadata information, such as the starting position, length and the list of nodes.
3.3.2 slice size calculation
SplitSize= Math.max (minSize,Math.min (maxSize,blockSize))
Mapreduce.input.fileinputformat.split.minsize default 1
Mapreduce.input.fileinputformat.split.maxsize default Long.MAXValue
BlockSize defaults to 128m
Maxsize: if this parameter is smaller than blockSize, it causes the slice to become smaller, and it is equal to the entire configuration parameter.
Minsize: if this parameter is adjusted larger than blockSize, the slice size will be larger than blockSize.
3.3.3 examples of slices
Examples of slicing
3.4 FileInputFormat
3.4.1 introduction to the implementation class
The number of MR task input files varies, and an interface and several implementation classes are defined for different types of MR to read different data.
Input inheritance relationship
TextInputFormat
Classes are used by default, each piece of data is read by row, and Key is the offset,Value = row content of that line of data.
KeyValueTExtInputFormat
Each line is a record, separated by the specified delimiter into Key and Value. The default is\ t.
NLineInputFormat
Under this model, when each map deals with InputSplit, it is no longer divided according to the Block block, but the file is divided according to the specified number of lines N.
Custom InputFormat
Basic interface, rewrite RecordReader, read one complete file at a time and encapsulate it as KV, and use SequenceFileOutPutFormat to output merged files.
CombineTextInputFormat
For scenarios with too many small files, logically merge multiple small files into a slicing task. More important
3.4.2 CombineTextInputFormat
The default framework TextInputFormat slicing mechanism is to slice the task by file, no matter how small the file is, it will be a separate slice and will be handed over to a MapTask, so that if there are a large number of small files, a large number of MapTask will be generated, and the processing efficiency is extremely inefficient. CombineTextInputFormat can logically plan multiple small files into a single slice so that multiple small files can be handed over to a MapTask for processing. It mainly includes virtual stored procedures and slicing procedures.
CombineTextInputFormat.setMaxInputSplitSize (job, 4194304); / / 4m
Virtual stored procedure:
When the file = 2*SplitSize, cut a piece with SplitSize, and the rest if
< 2 * SplitSize 则对半分。 切片过程: 判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。 如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。 切片过程 3.6 OutputFormat OutputFormat 是 MapReduce 输出的基类,常见的实现类如下: 3.5.1 TextOutputFormat 系统默认输出格式,把每条记录写为文本行,他的K跟V是任意类型,系统在写入时候会统一转化为字符串。 3.5.2 SequenceFileOutputFormat 此模式下的输出结果作为后续MapReduce任务的输入,该模式下数据格式紧凑,很容易被压缩。 3.5.3 自定义OutputFormat 如果需求不满足可按需求进行自定义。 自定义类继承自FileOutputFormat。 重写RecordWriter,改写具体输出数据的方法write。 3.6 MapReduce 流程 3.6.1 整体流程图 MapReduce流程 MapTask 工作机制 Read阶段:MapTask 通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。 Map阶段:将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。 Collect收集阶段:它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。 Spill阶段:先按照分区进行排序,然后区内按照字典对key进行快排,并在必要时对数据进行合并、压缩等操作。 Combine阶段:选择性可进行MapTask内的优化提速。 ReduceTask 工作机制 Copy阶段:从所有的MapTask中收集结果然后决定将数据放入缓存还是磁盘。 Merge阶段:copy数据时后天会对磁盘还有内存数据进行Merge。 Sort阶段:ReduceTask需对所有数据进行一次归并排序,方便执行reduce 函数。 Reduce阶段:调用用户 reduce() 函数将计算结果写到HDFS上。 3.6.2 Shuffle Shuffle机制 MapReduce 的核心就是 Shuffle 过程,Shuffle 过程是贯穿于 map 和 reduce 两个过程的!在Map端包括Spill过程,在Reduce端包括copy和sort过程。 具体Shuffle过程如下: MapTask 收集我们的map()方法输出的kv对,放到内存缓冲区中。 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件,溢出前会按照分区针对key进行区内快排。 多个溢出文件会被合并成大的溢出文件。 在溢出过程及合并的过程中,都要调用 Partitioner 进行分区和针对key进行排序。 ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据。 ReduceTask 对收集后的数据进行合并跟归并排序。 进入 ReduceTask 的逻辑运算过程,调用用户自定义的reduce()方法。 Shuffle 中的缓冲区大小会影响到 MapReduce 程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。 3.6.3 Partition MapReduce 默认的分区方式是hashPartition,在这种分区方式下,KV 对根据 key 的 hashcode 值与reduceTask个数进行取模,决定该键值对该要访问哪个ReduceTask。 public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; // numReduceTasks 默认 = 1 所以导致默认的reduce结果 = 1 } 自定义的时候一般就是类继承Partitioner然后重写getPartition 方法。用户也可以设置ReduceTask数量,不过会遵循如下规则。 如果 ReduceTask 数 >The number of getPartition, will produce several more empty output part-r-000xx.
If the number of 1 < ReduceTask < getPartition, some data cannot be placed and an error will be reported.
If ReduceTask = 1, no matter how many partition files are output by the MapTask side, the result is a file.
Partitions must start at 0 and accumulate gradually.
For example, suppose the number of custom partitions is 5.
Job.setNumReduceTasks (1): will run normally, but will produce an output file.
Job.setNumReduceTasks (2): error will be reported.
Job.setNumReduceTasks (6): > 5, the program will run normally and an empty file will be generated.
3.6.4 Ring buffer
The output of Map is processed by Collector, and each Map task continuously outputs key-value pairs to a ring data structure constructed in memory. The ring data structure is used to use memory space more efficiently and to put as much data in memory as possible.
The ring data structure is actually a byte array byte [], called kvbuffer, with a default value of 100m. It mainly stores data and metadata. There is a demarcation point in the middle, and the demarcation point is variable. When the size of the buffer written by the ring buffer reaches 80% to meet the overflow condition, the spill is overwritten. The system has two threads, one responsible for writing data and the other responsible for spill data.
Data:
Store Key + Value + bufindex. Among them, the bufindex (that is, the storage direction of the data) has been growing upward, for example, the initial value of bufindex is 0, after an int-type key is written, the bufindex grows to 4, and after an int-type value is written, the bufindex grows to 8.
Metadata:
Metadata is created for sorting and is data about data description.
Kvmeta = Partition + keystart + valstart + valLength, occupying a total of 4 Int lengths, where the length of K = the starting point of V-the starting point of K.
Kvmeta's storage pointer Kvindex jumps down four squares at a time, and then fills the quad data one by one. For example, the initial position of Kvindex is-4, when the first key-value pair is written, (Kvindex+0) stores the starting position of partition, (Kvindex+1) stores keystart, (Kvindex+2) stores valstart, and (Kvindex+3) stores value length, and then Kvindex jumps to-8 position. After the second key-value pair and index are written, Kvindex jumps to-12 position.
Kvmeta.put (kvindex + PARTITION, partition); 2kvmeta.put (kvindex + KEYSTART, keystart); 3kvmeta.put (kvindex + VALSTART, valstart); 4kvmeta.put (kvindex + VALLEN, distanceTo (valstart, valend)); 5Compact / advance kvindex change the value of index 4 positions at a time! 6kvindex = (kvindex-NMETA + kvmeta.capacity ())% kvmeta.capacity ()
Ring buffer
3.6.5 Combiner merger
Combiner is a component in addition to Mapper and Reducer in MR programs.
Combiner runs on each node where MapTask resides, and Reducer accepts all Mapper output.
Combiner means local summary to reduce network transmission.
When using Combiner, be careful not to affect the final business logic! For example, finding the average cannot be used. Make peace and OK.
3.6.6 about MapReduce sorting
The most important operation of the MapReduce framework is sorting. Both MapTask and ReduceTask will sort quickly according to key in dictionary order.
MapTask quickly writes the buffer data to disk, and then the disk files are merged and sorted.
ReduceTask uniformly merges and sorts all data in memory and disk.
3.6.7 ReduceJoin and MapJoin
Reducejoin
Idea: by taking the association condition as the Key output of Map, the data of the two tables satisfying the Join condition and carrying the data source file are sent to the same ReduceTask, and the data concatenation information is merged on the reduce side.
Disadvantages: the merge operation is completed on the reduce side, the processing pressure on the Reduce side is too high, and the Reduce side is easy to produce data tilt.
MapJoin
Applicable: suitable for scenarios where a table is very small and a table is very large.
Idea: cache multiple tables on the Map side and process the business logic in advance, so as to increase the business on the Map side, reduce the pressure on the data on the Reduce side, and reduce the data tilt as much as possible.
3.6.8 pay attention
ReduceTask = 0 means there are no Reduce nodes, and the number of output files is the same as the number of Map.
ReduceTask defaults to 1, so the result is a file.
The number of ReduceTask is not arbitrarily set, depending on the cluster performance and the result requirements.
When dealing with Mapper logically, three of these methods, map, setup and cleanup, can be implemented according to business requirements.
3.7 Compression
Compression is an optimization strategy to improve the running efficiency of Hadoop. By compressing the data in the running process of Mapper and Reducer, it can reduce the transmission of disk space and network, and finally improve the running speed of MR. However, it should be noted that compression also brings burden to CPU operations.
Basic principles of compression:
Compute-intensive tasks, less compression.
IO intensive tasks, multi-compression.
Compression format comes with algorithm extension can be split after compression, code modification DEFLATE is DEFLATE.deflate No need to modify Gzip is DEFLATE.gz No need to modify bzip2 is bzip2.bz2 No need to modify Snappy No Snappy.snappy No need to modify LZO No LZO.lzo requires indexing
You also need to specify the input format
4 YARN
Yarn is a resource scheduling platform, which is responsible for providing server computing resources for computing programs, which is equivalent to a distributed operating system platform, while computing programs such as MapReduce are equivalent to applications running on the operating system.
4.1 basic composition
Yarn architecture
YARN is mainly composed of ResourceManager, NodeManager, ApplicationMaster and Container.
ResourceManager
Processing client request
Monitoring NodeMananger
Start or monitor ApplicationMaster
Allocation and scheduling of computing resources
NodeManager
Manage resources on a single node
Deal with ResourceManager's orders.
Processing commands from ApplicationMaster
ApplicationMaster
Responsible for data segmentation.
Request resources for the application and assign them to internal tasks.
Task monitoring and fault tolerance.
Container
Container is the abstraction of resources in YARN, which encapsulates multi-dimensional resources on a node, such as memory, CPU, disk, network and so on.
YarnChild is actually a process that runs programs. The Maptask / ReduceTask requested from Resouce Manager when the MrAppMaster runs the program.
4.2 Yarn scheduling MapReduce tasks
Yarn scheduling process
When the MR program is submitted to the node where the client is located, the roughly running process is as follows:
Job submission
Client calls the job.waitForCompletion method YarnRunner to submit the MapReduce job to the entire cluster. Client applies to RM for an assignment id.
RM returns the submission path and job id of the job resource to Client.
Client submits jar packages, slice information, and configuration files to the specified resource submission path.
After Client has submitted the resources, apply to RM to run MrAppMaster.
Job initialization
When RM receives a request from Client, add the Task to the capacity scheduler.
An idle NodeManager gets the Task.
The NodeManager creates the Container and produces the MRAppMaster.
Download the resources submitted by Client locally.
Task assignment
MRAppMaster requests RM to run multiple MapTask task resources.
RM assigns the task of running MapTask to the two NodeManager. The allocation principle is to give priority to jar and data on the same machine, followed by as much as possible in the same computer room. Finally, come to a random idle machine.
Task running
MR sends a program startup script to two NodeManager that receive the task, and each of these NodeManager starts MapTask,MapTask to sort the data partitions.
MrAppMaster waits for all the MapTask to finish running, and then applies to the RM container to run ReduceTask.
ReduceTask gets the data of the corresponding partition from MapTask.
After the program is run, MR will apply to RM to log itself off.
Progress and status updates
Tasks in YARN return their progress and status (including counter) to the application manager, and the client requests progress updates to the application manager every second to show it to the user.
Homework completed
In addition to requesting the progress of the job from the application manager, the client checks whether the job is completed by calling waitForCompletion () every 5 seconds. After the job is completed, the application manager and Container clean up the work status. The information about the job is stored by the job history server for later user verification.
4.3 Resource Scheduler
Currently, there are three main types of Hadoop job schedulers: FIFO, Capacity Scheduler, and Fair Scheduler. The default resource scheduler for Hadoop2.7.2 is Capacity Scheduler.
4.3.1 FIFO
FIFO scheduling
4.3.2 capacity Scheduler Capacity Scheduler
Capacity scheduler
Multiple queues are supported, each queue is configured with certain resources, and each queue adopts FIFO policy.
In order to prevent the same Tong Hu job from monopolizing queue resources, there will be a limit on the amount of resources that the same user can submit jobs.
Calculate only the ratio of the number of running tasks in each queue to the number of tasks it should be allocated, and select the queue with the lowest ratio (the least idle).
Queue tasks are sorted according to job priority and submission time, taking into account user resource constraints and memory constraints.
For example, job1, job2, and job3 allocation are also running in parallel at the top.
4.3.3 Fair Scheduler Fair Scheduler
Multiple queues and multiple users are supported, resources in each queue can be configured, and jobs in the same queue fairly share all resources in the queue.
Fair scheduler
For example, there are three task queues: queue1, queue2 and queue3. The job in each queue allocates resources according to priority. The higher priority gets more resources, but ensures that each task is assigned to resources.
The gap between the ideal resources required for each task and the actual resources obtained is called a vacancy, and the same queue is carried out according to the level of the vacancy, and the larger the vacancy is, the more priority is given to obtaining resources.
4.4 Task conjecture execution
The job completion time depends on the slowest task completion time. 99% of the Map tasks in the system are completed, and only a few Map are always slow, and the system will find a slow task, such as a task running much slower than the average speed of the task. Start a backup task for the lag task and run it at the same time. Whoever finishes running first will adopt the result.
5 MapReduce optimization method
MapReduce optimization method mainly considers six aspects: data input, Map phase, Reduce phase, IO transmission, data tilt problem and common tuning parameters.
5.1 data entry
When collecting data, use Hadoop Archive to package a number of small files into a Har file.
Before business processing, SequenceFile consists of a series of KV, key= file name, value= file content, merging a large number of small files into large files.
In MapReduce processing, CombineTextInputFormat is used as input to solve the scene of a large number of small files on the input side.
Enabling JVM reuse for a large number of small file tasks can be accelerated, and JVM reuse allows JVM instances to be reused N times in the same job. The value of N can be configured in the mapred-site.xml file of Hadoop, usually between 10 and 20.
5.2 Map Pha
Reduce the number of overwrites of Spill, adjust the size of circular cache, and reduce disk IO.
Reduce the number of Merge merging, increase the number of Merge file size reduction.
Combine processing is carried out on the map side without affecting the business.
5.3 Reduce Pha
Set a reasonable number of Map and REduce, too little will cause Task to wait. Too much will lead to fierce competition for resources.
Set the coexistence of Map and Reduce phases, and Reduce can also run after map is running to a certain extent.
To avoid using the Buffer on the Reduce,Reduce side, you should also set it reasonably to prevent overwriting to disk as far as possible.
5.4 IO transmission
Data compression is used to reduce the network IO time.
Use SequenceFile binaries.
5.5 data skew
The partition boundary value is set by sampling the data to get the result set.
Custom partition.
Use Combine to reduce data skew.
Adopt MapJoin and avoid ReduceJoin as much as possible.
This is the end of the content of "what are the characteristics of Hadoop". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.