Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to organize and use data in HDFS

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly shows you "how to organize and use data in HDFS", the content is easy to understand, clear, hope to help you solve doubts, the following let the editor lead you to study and learn "how to organize and use data in HDFS" this article.

4.1 Organizing data

Organizing data is one of the most challenging aspects of using Hadoop. There is pressure from different departments and different people in the enterprise, such as data scientists and cluster administrators, everyone has their own requirements for data. More importantly, these requirements are usually made after the data application is put into production and a large amount of data has been accumulated.

Organizational data in Hadoop has multiple dimensions. First, we need to learn how to organize data in HDFS, and then we will face practical problems, such as partitioning and compressing the data, deciding whether to enable Kerberos to protect the cluster, and managing and delivering data changes. The goal of this chapter is to focus on some of the complex issues of organizing data, including data partitioning and compression, so let's start by building data in HDFS.

4.1.1 Directory and file layout

Defining a cluster-wide standard for how data is organized is worth exploring because it makes it easier to discover the location of the data and to apply and manage problems that can be solved through data storage. Because we work within the scope that the file system can express, a common way to arrange data is to create a hierarchy that is consistent with the organizational or functional structure of the enterprise. For example, if you are working on an analysis team and are introducing a new dataset into a cluster, one way to organize the directory will be shown in figure 4.1.

Figure 4.1 sample HDFS directory layout

Before learning, it is best that the enterprise has identified a data format, such as Avro, which can improve the schema over time. By pasting the version number in the structure, you can flexibly transfer to any new data format and use directory paths to communicate different file formats.

The only challenge in placing a version number in a directory is how to effectively communicate the changes to the data consumer. If there is a problem, enterprises can try to think of HCatalog as a data format abstracted from the client.

Partition by date and other fields

You may use a directory structure to simulate the data evolution needs of different departments of the enterprise, but why do you need further partitioning by date? This is a technique used in the early days of Hive to help speed up queries. If you put all the data in one directory, the equivalent Hadoop full table scan is actually performed each time the data is accessed. On the contrary, it is more sensible to partition the data according to how it is accessed.

Because it is difficult for us to know exactly how to access the data in advance, but segmenting the data by the date of data generation is a reasonable partitioning attempt. If the data does not have a date, discuss adding a date with the data producer, because the time the event or record was created is the key data point that should always be captured.

4.1.2 data layer

In his 2012 Strata presentation, Eric Sammer proposed the idea of tiering data storage, which is also well related to the main principle of Nathan Marz's Lambda architecture-never delete or modify the original data.

At first glance, this seems to make no sense. Because you only need to extract important parts of the data source, the rest can be discarded, after all, it is a waste to retain all the original data, especially if some of it is not actively used, but it is difficult to guarantee that valuable information will never be extracted from unused data in the future.

Our software occasionally makes mistakes. Imagine that if you are transferring data in software and get the desired results, you decide to discard the source data, but you suddenly find that there is an error in the operation logic, because the discarded source data will not be able to return and regenerate the results.

Therefore, it is recommended that enterprises consider data storage based on the following levels:

The raw data is the first layer. This is unchanged data captured from the source, and this layer of data should never be modified because there may be errors in the logic that generates derivatives or aggregates, and if the original data is discarded, it cannot be recovered when an error occurs.

Layer 2: create derived data from the original data. At this layer, you can perform deduplication and any other data governance requirements.

The third layer: summarize the data. This is calculated based on derived data and may be entered into the HBase system or the selected NoSQL system to access the data in real time during production and analysis.

The data layers should also be displayed in the catalog layout so that users can easily distinguish between these layers.

Once you have determined the directory layout used to partition the data, the next step is to figure out how to import the data into those partitions.

4.1.3 Zoning

Partitioning is the process of taking a dataset and splitting it into different parts. These parts are partitions that represent a meaningful partition of the data. An example of a public partition in data is time because it allows the person who queries the data to narrow it down within a specific time window. Section 4.1.2 takes time as a key factor in determining the layout of data in HDFS.

What if you have a large dataset in HDFS that needs to be partitioned? This section describes two methods that can be used to partition data.

Using MultipleOutputs to partition data

Imagine that you flow stock price data into HDFS and want to write a MapReduce job to partition the data based on stock quotes for the day. To do this, you need to write multiple output files in a single task, so let's see how to achieve this goal.

problem

The data needs to be partitioned, but most output formats create only one output file for each task.

Solution

Use the MultipleOutputs class bundled with MapReduce.

Discuss

The MultipleOutputs class in Hadoop bypasses the normal channel for generating output in Hadoop. It provides a separate API to write the partition output and writes the output directly to the task attempt directory in HDFS, which can continue to be provided to the job's Context object's standard write method to collect the output, or you can use MultipleOutputs to write the partition output. Of course, you can also choose to use only the MultipleOutputs class and ignore the standard context-based output.

In this technique, you will use MultipleOutputs to partition stocks by quote date. The first step is to set up MultipleOutputs for use at work. In the driver, you will specify the output format and key and value types:

Why do I need to name the output in the driver? You may be wondering why MultipleOutputs requires the output name (the partition in the previous example). This is because MultipleOutputs supports two modes of operation-static and dynamic partitioning.

If the partition name is known in advance, the static partition works fine, which provides additional flexibility for each partition to specify a different output format (only multiple calls to MultipleOutputs.addNamedOutput with different named outputs are required). For static partitions, the output name specified when calling addNamedOutput is the same as when the output is emitted in mapper or reducer.

The named output is mainly for dynamic partitions, because in most cases, the operator does not know the partition name in advance. In this case, you still need to provide the output name, which may also be ignored because you can specify the partition name dynamically in mapper or reducer.

As shown in the following code, the map (or reduce) class takes a handle to the MultipleOutputs instance and then writes to the partition output using the write method. Notice that the third parameter is the partition name, which is the stock date:

Don't forget the close () method! It is important to call the close method on MultipleOutputs after the task is completed. Otherwise, the output may lose data or even cause file corruption.

As you can see in the following output, running the previous example generates many partition files for a single mapper. We can also see the original map output file, which is empty because no records are emitted using the Context object:

This example uses map-only jobs, but in production, you may want to limit the number of tasks that create partitions. There are two ways to do this:

Use CombineFileInputFormat or a custom input format to limit the number of mapper in the job.

Use reducer to explicitly specify a reasonable number of reducer.

Summary

MultipleOutputs has a lot to watch: it supports "old" and "new" MapReduce API, and supports a variety of output format classes. However, there are some things you should pay attention to when using MultipleOutputs:

When using MultipleOutput in mapper, keep in mind that you end up with a NumberOfMappers * NumberOfPartition output file, which, as a rule of thumb, reduces clusters with a large number of two values!

Each partition generates HDFS file handle overhead during the task.

You will often encounter a large number of small files that will accumulate in multiple uses of the partitioning program. Of course, this can be mitigated by a compression strategy (see Section 4.1.4 for more information).

Although Avro comes with the AvroMultipleOutputs class, it is slow due to code inefficiency.

In addition to the MultipleOutputs method, Hadoop comes with a MultipleOutputFormat class that functions like MultipleOutputs, with the main drawback that only the old MapReduce API is supported and only one output format is available for all partitions.

Another partitioning strategy we can use is the MapReduce partitioning program, which can help reduce the large number of files that may be generated using MultipleOutputs.

Use a custom MapReduce partitioning program

Another way to partition is to use the partitioning tool built into MapReduce. By default, MapReduce uses a partitioning program to calculate the hash of each map output key and counts the number of reducer to determine which reducer the record should be sent to. We can write a custom partition program, and then control how the partition is generated according to the partition scheme routing record.

This technique has an additional benefit over previous techniques, because you usually get fewer output files, and each reducer creates only one output file, while MultipleOutputs produces N output files per map or reduce task, one per partition.

problem

Partition the input data.

Solution

Write a custom partition program that records the partition to the appropriate reducer.

Discuss

The custom divider exposes helper methods to the MapReduce driver, allowing you to define a map from date to partition and write this map to the job configuration. Then, when MapReduce loads the divider, MapReduce calls the setConf method; in the divider, we will read mapping into map and then use it when partitioning.

The driver code needs to set up a custom partition program configuration. The partition in this example is the date, if you want to make sure that each reducer corresponds to a unique date. The stock sample data has 10 unique dates, so you can use 10 reducer configuration jobs, and you can call the previously defined partition helper function settings to map each unique date to a unique reducer configuration.

Apart from extracting the stock date from the input data and using it as an output key, mapper has few other functions:

Run the command from the previous example as follows:

This job will generate 10 output files, each containing stock information for the day.

Summary

Using the MapReduce framework to partition data naturally brings the following advantages:

Sort the data in the partition because shuffle will ensure that all data streamed to reducer is sorted, which allows an optimized join policy for the data.

Data in reducer can be deduplicated, which is one of the benefits of the shuffle phase.

The main problem to pay attention to when using this technique is data bias, which can be a problem if you want to ensure that the load is spread across the reducer as much as possible. For example, if the partition is the number of days, then most records may be one day, and only some records may be available the day before or the day after. In this case, ideally most of the Reducer is allocated to a day to partition records, and then one or two records may be assigned the day before or the day after, or the input can be sampled and the optimal number of reducer can be dynamically determined based on the sample data.

After generating the partition output, the next challenge is to deal with the large number of potential small files generated by the partition.

4.1.4 data compression

It is inevitable to have small files in HDFS, maybe you are using a partitioning technique similar to the one described earlier, or the data may fall organically in HDFS in small file sizes. Either way, some weaknesses of HDFS and MapReduce will be exposed, such as:

Hadoop's NameNode keeps all HDFS metadata in memory for fast metadata manipulation. Yahoo estimates that each file takes up an average of 600 bytes of memory, translating the metadata overhead into 1 billion files, totaling 60 GB, all of which need to be stored in NameNode memory. Even today's midrange server RAM capacity represents a large amount of memory for a single process.

If the input to a MapReduce job is a large number of files, the number of mapper to run (assuming the files are text or splittable) will be equal to the number of blocks consumed by those files. If you run a MapReduce job that enters thousands or millions of files, it will take more time for the kernel layer to process the process of creating and destroying map tasks, rather than actually working.

Finally, if you run in a controlled environment with a scheduler, you may limit the number of tasks that MapReduce jobs can use. Because each file (by default) generates at least one map task, this may cause the job to be rejected by the scheduler.

If you don't think there's going to be this problem, think again. Is the file percentage less than the HDFS block size? How much smaller? 50% to 70%? Or 90%? What if the big data project suddenly needs to be expanded to handle datasets of several orders of magnitude? Isn't that why you used Hadoop in the first place? The extension requires the addition of more nodes, and the enterprise certainly does not accept redesigning the Hadoop and processing the migration file. Therefore, it is best to think about and prepare these possible problems as soon as possible in the design phase.

This section describes some techniques that can be used to combine data in HDFS. Let's start with a utility called filecrush, which compresses small files together to create a smaller number of larger files.

Use filecrush to compress data

Compression is the act of combining small files together to produce larger files, which helps relieve heap pressure on NameNode.

In terms of compatibility with the Hadoop version, the filecrush utility applies only to Hadoop version 1. But compacter (https://github.com/alexholmes/hdfscompact) and Archive are compatible with Hadoop version 2.

problem

Want to compress small files to reduce the metadata that NameNode needs to keep in memory

Solution

Use the filecrush utility.

Discuss

The filecrush utility combines or compresses multiple small files to form larger files. The utility is very complex and can

Determine the size threshold for compressed files (and keep files large enough by association)

Specify the maximum size of the compressed file

Use different input and output formats and different input and output compression codecs (for moving to different file formats or compression codecs)

Swap smaller files with newer compressed files

We use filecrush in a simple example, shredding the directory of a small text file and replacing it with gzipped SequenceFiles.

First, create 10 input files artificially in the HDFS directory:

Run filecrush, which replaces the small file with the new large file and converts the text file to a compressed SequenceFile:

After running filecrush, you will find that the files in the input directory have been replaced by a single SequenceFile:

You can also run the text Hadoop command to view the text representation of SequenceFile:

You will notice that all the original small files have been moved to the output directory specified in the command:

If you run filecrush without the-- clone option, the input file will remain unchanged and the crushed file will be written to the output directory.

Input and output file size thresholds

How does filecrush determine if a file needs to be shredded? By looking at each file in the input directory and comparing it to the block size (in Hadoop 2, you can specify the size in the-Ddfs.block.size command). If the file is less than 75% of the block size, it will be compressed. You can customize this threshold through the-- threshold parameter, for example, if you need to increase the value to 85%, specify-- threshold 0.85.

Similarly, filecrush uses the block size to determine the output file size. By default, it does not create an output file that takes up more than eight blocks, but can be customized using the-- max-file-blocks parameter.

Summary

Filecrush is a simple and quick way to group small files together. Any type of input or output file is supported as long as there are associated input and output format classes. Unfortunately, it doesn't work with Hadoop 2, and there hasn't been much change in the project over the past few years, so companies may not consider the program.

Because filecrush requires input and output formats, it obviously has its drawbacks if you are working with binary data and need a way to group small binaries together.

Using Avro to store multiple small binaries

Suppose you are developing a project similar to Google images in which you can crawl web pages and download image files from the Web site. The project is on an Internet scale, so millions of files have been downloaded and stored separately in HDFS. You already know that HDFS can't handle a large number of small files very well, so let's try Avro.

problem

You want to store a large number of binaries in HDFS and you don't have to comply with NameNode memory limits.

Solution

The easiest way to deal with small binaries in HDFS is to package them into a larger file. This technique reads all files stored in the local disk directory and saves them in a single Avro file in HDFS. You will also learn how to use the Avro file in MapReduce to process the original file contents.

Discuss

Figure 4.2 shows the first part of this technique, where you can create an Avro file in HDFS. Doing so creates fewer files in HDFS, which means less data is stored in NameNode memory, which also means more content can be stored.

Figure 4.2 storing small files in Avro to allow more files to be stored

Avro is a data serialization and RPC library invented by Doug Cutting, the creator of Hadoop. Avro has powerful architectural evolution capabilities, and its advantages have surpassed competitors, such as SequenceFile, which is described in detail in Chapter 3 and will not be repeated here.

Look at the following Java code, which creates the Avro file:

Code 4.1 reads the directory containing small files and generates a single Avro file in HDFS

To run the code in this chapter, you need to install Snappy and LZOP compression codecs on the host. Details on how to install and configure them are not detailed here.

What happens when you run this script against Hadoop's config directory (replace $HADOOP_CONF_DIR with the directory that contains the Hadoop configuration file):

Let's make sure that the output file is HDFS:

To ensure that everything works properly, you can also write code to read the Avro file from HDFS and output the MD5 hash value for the contents of each file:

This code is simpler than writing. Because Avro writes the schema to each Avro file, there is no need to provide Avro with information about the schema during deserialization:

At this point, we already have the Avro file in HDFS. Although this chapter is about HDFS, the next thing you may want to do is to deal with files written in MapReduce. This requires writing a Map-only MapReduce job that reads the Avro record as input and outputs a text file containing the file name and the MD5 hash value of the file content, as shown in figure 4.3.

Figure 4.3 the map job reads the Avro file and outputs the text file

The following shows the code for this MapReduce job:

Code 4.2 A MapReduce job with an Avro file containing small files as input

If you run this MapReduce job from a previously created Avro file, the job log file will contain the file name and hash:

This technique assumes that the file format used, such as image files, cannot connect individual files together. If the file can be connected, you should consider this option. With this approach, try to make sure that the file size is the same as the HDFS block to minimize the data stored in NameNode.

Summary

We can use Hadoop's SequenceFile as a mechanism for saving small files. SequenceFile is a relatively mature technology, which is more mature than Avro files. But SequenceFiles is Java-specific and does not provide the rich interoperability and versioning semantics that Avro brings.

Google's Protocol Buffers and Apache Thrift (derived from Facebook) can also be used to store small files. However, there is no input format for native Thrift or Protocol Buffers files. Another method we can use is to write the file to the zip file. The disadvantage is that you have to write a custom input format to handle zip files, and the second is that zip files are inseparable (as opposed to Avro files and SequenceFiles). This can be mitigated by generating multiple zip files and trying to make them close to the HDFS block size. Hadoop also has a CombineFileInputFormat that splits multiple inputs (across multiple files) to a single map task, which greatly reduces the number of map tasks required to run.

We can also create a tarball file that contains all the files and generate a separate text file that contains the location of the tarball file in the HDFS. This text file will be provided as input to the MapReduce job, and mapper will open the tarball directly. However, this approach bypasses the locality of MapReduce because the mapper is scheduled to be executed on the node that contains the text file, so you may need to read the tarball block from the remote HDFS node, resulting in unnecessary network Imando.

Hadoop Archive File (HAR) is created specifically to solve the problem of small files and is a virtual file system on top of HDFS. The disadvantage of HAR files is that they cannot be optimized for local disk access in MapReduce and cannot be compressed. Hadoop version 2 supports HDFS Federation, where HDFS is divided into several different namespaces, each managed independently by a separate NameNode. In effect, this means that the overall impact of keeping block information in memory can be distributed across multiple NameNode, thus supporting a larger number of small files.

Finally, MapR, which provides a Hadoop distribution, has its own distributed file system that supports a large number of small files. Using MapR for distributed storage requires a lot of change for the system, so enterprises are unlikely to move to MapR to alleviate the problem of HDFS. You may encounter time when you want to process small files in Hadoop, and using them directly can lead to inflated NameNode memory and slow MapReduce jobs. This technique helps alleviate these problems by packaging small files into larger container files. I chose Avro because it supports splittable files, compression and an expressive architectural language, which will help with version control. What if the file is large and you want to store it more efficiently? This will be addressed in section 4.2.

4.1.5 Atomic data movement

Behaviors such as partitioning and compression tend to follow a similar pattern, such as generating output files in a staging directory, and then moving atomically to the final destination directory after successfully staging all the output files. This may cause some problems:

What trigger is used to determine that you are ready to perform atomic movement?

How do atoms move data in HDFS?

What is the impact of data movement on the final data reader?

It might be nice to use the MapReduce driver as a post-processing step to perform atomic movement, but what happens if the client process dies before the MapReduce application completes? This is where using OutputCommitter in Hadoop is very useful, because you can move any atomic file as part of your work instead of using a driver.

The next question is how to move data in HDFS. For the longest time, renaming methods on the DistributedFileSystem class (which is a concrete implementation that supports HDFS) were thought to be atomic. But it turns out that, in some cases, this is not an atomic operation. This was resolved in HADOOP-6240, but the renaming method was not updated for backward compatibility reasons. Therefore, the renaming method is still not a real atom. Instead, you need to use the new API. As shown below, the code is cumbersome and applies only to newer versions of Hadoop:

What HDFS lacks is the ability to swap directories for atoms, which is very useful in situations such as compression. Because in these cases, you need to replace all the contents of the directory that other processes (such as Hive) are using. There is a project called "Atomic Directory Swapping Operation" that may be helpful to you.

The above is all the contents of the article "how to organize and use data in HDFS". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report