Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Apache Beam official documentation

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

1. Overview

To use Beam, you must first create a driver using a class in one of the SDK of Beam SDKs. The driver defines the pipeline, including all inputs, transformations, and outputs. It also sets execution options for your pipe (usually passed using command-line options). These include the pipeline operator, which in turn determines the back end of the pipeline operation.

Beam SDK provides abstractions of many mechanisms that simplify large-scale distributed data processing. The same Beam abstraction can be used in both batch and streaming data sources. When you create a Beam pipeline, you can think about data processing tasks based on these abstractions. They include:

L pipeline Pipeline: Pipeline encapsulates the entire data processing task from beginning to end. It includes reading input data, converting data and writing output data. All Beam drivers must create a Pipeline. When you create a Pipeline, you must also specify execution options that tell Pipeline where and how to run it.

L PCollection: PCollection represents the distributed dataset in which the Beam pipeline runs operations. The dataset can be bounded, which means that it comes from a fixed source such as a file, or from a source that is constantly updated by subscriptions or other mechanisms. Your pipes typically create PCollection initial values by reading data from external data sources, but you can also create PCollection from data in memory in the driver. Therefore, the PCollections is the input and output of each step in the pipeline.

L transform Transform: Transform represents a data processing operation or step in the pipeline. Each Transform requires one or more PCollection objects as input, executes the handlers you provide on the elements of the PCollection object, and generates one or more output PCollection objects.

L I Source O Source and Sink: Beam provide Source and Sink APIs for reading and writing data, respectively. Source encapsulates the code needed to read data into the Beam pipeline from some external source, such as cloud file storage or subscription streaming data sources. Sink also encapsulates the code required to write elements of PCollection to an external data source.

A typical Beam driver works as follows:

Create a Pipeline object and set pipe execution options, including the pipe runner.

L create the initial value of PCollection for pipe data, use Source API to read data from external sources, or use Create transformation to build PCollection from data in memory.

Apply transformations to each PCollection. Transformations can change, filter, group, analyze, or otherwise process elements in PCollection. The transformation creates a new output PCollection without consuming a collection of inputs. A typical pipeline applies subsequent transformations to each new output PCollection in turn until the processing is complete.

L outputs the final transformed PCollection, usually using Sink API to write data to an external source.

L runs the pipe using the specified pipe runner.

When you run the Beam driver, the specified pipe runner will be based on the transformation that has been applied to the PCollection object you created, based on the workflow diagram of the pipe you created. The graph is then executed using the appropriate distributed processing back end to become an asynchronous "job" (or equivalent) at the back end.

two。 Create Pip

Pipeline abstractly encapsulates all the data and steps of a data processing task. Beam drivers typically start by building a Pipeline object and then use that object as the basis for creating pipeline datasets PCollection and Transforms operations.

To use Beam, the driver must first create an instance of the Beam SDK Pipeline class (usually in the main () function). When you create a Pipeline, you also need to set some configuration options. Configuration options for pipes can be set programmatically, but it is usually easier to set options in advance (or read from the command line) and pass them to the Pipeline object when the object is created.

/ / Start by defining the options for the pipeline.

PipelineOptions options = PipelineOptionsFactory.create ()

/ / Then create the pipeline.

Pipeline p = Pipeline.create (options)

2.1 configure Pipe options

Use pipe options to configure different aspects of the pipe, such as the pipe runner that will execute the pipe and any specific configuration required by the selected runner. Pipe options may contain information such as the project ID or the location of the storage file.

When you run the pipe on the running program of your choice, a copy of PipelineOptions will be available for your code. For example, you can read PipelineOptions from the context of DoFn.

PipelineOptions options = context.getPipelineOptions ()

Command line parameter setting PipelineOptions

You can configure the pipeline by creating a PipelineOptions object and setting fields directly, and Beam SDK includes a command-line parser that you can use to parse command-line parameters and set the PipelineOptions field.

To read the options from the command line, build the PipelineOptions object, as shown in the following sample code:

MyOptions options = PipelineOptionsFactory.fromArgs (args) .withvalidation () .create ()

Command line arguments that follow the following format are interpreted:

-- =

Note: attaching the method .withvalidation will check the required command-line arguments and validate the parameter values.

PipelineOptions is built in this way, and any option can be specified as a command-line argument.

Note: this WordCount sample pipeline indicates how to use command line options to set pipe options at run time.

Create custom options

In addition to the standard PipelineOptions, you can add your own customization options. To add custom options, define an interface with getter and setter methods for each option, as shown in the following example:

Public interface MyOptions extends PipelineOptions {String getMyCustomOption (); void setMyCustomOption (String myCustomOption);}

You can also specify descriptions and default values, which are displayed when the user passes-- help as a command-line argument.

Use comments to set the description and default values as follows:

Public interface MyOptions extends PipelineOptions {@ Description ("My custom command line argument.") @ Default.String ("DEFAULT") String getMyCustomOption (); void setMyCustomOption (String myCustomOption);}

It is recommended that you register the custom interface with PipelineOptionsFactory when you create the PipelineOptions object. After registering the custom interface with PipelineOptionsFactory,-- help can find the custom option interface and add it to the output of the-- help command. PipelineOptionsFactory will also verify that the custom option is compatible with all other registered options.

The following sample code shows how to register the custom option interface to PipelineOptionsFactory:

PipelineOptionsFactory.register (MyOptions.class); MyOptions options = PipelineOptionsFactory.fromArgs (args) .withvalidation () .as (MyOptions.class)

The pipe can now accept-- myCustomOption=value as a command-line argument.

3. Use PCollections

The PCollection abstracts represent potentially distributed, multi-element data sets. You can think of PCollection as "pipeline" data; Beam transformations use PCollection objects as inputs and outputs. Therefore, if you want to process the data in the pipeline, you must take the form of PCollection.

After you have created the Pipeline, you need to create at least one form of PCollection first. The created PCollection is used as input to the first operation in the pipe.

3.1Create a PCollection

To create an instance of a PCollection object, you can read data from an external data source using Beam's Source API, or you can store data in a memory collection class in the driver. The former is usually a way for pipes to obtain data in a production environment; Beam's Source API contains a variety of adapters that can be read from external sources (such as large cloud-based files, databases, or subscription services). The latter is mainly used for testing and debugging purposes.

Read from an external source

To read from an external source, you need to use one of the Imax O adapters provided by Beam. Adapters have different uses, but they all read some external data source and return PCollection, whose elements represent the data records in that source.

Each data source adapter has an Read transformation; if you want to read it, you must apply the transformation to the Pipeline object itself. For example, read an external text file and return a PCollection whose element type is String, with each String representing a line in the text file. Here is how to apply TextIO.Read to Pipeline to create a PCollection:

Public static void main (String [] args) {/ / Create the pipeline. PipelineOptions options = PipelineOptionsFactory.fromArgs (args). Create (); Pipeline p = Pipeline.create (options)

/ / Create the PCollection 'lines' by applying a' Read' transform. PCollection lines = p.apply ("ReadMyFile", TextIO.read (). From ("protocol://path/to/some/inputData.txt");}

See the iCandle O section for more information on how to read the various data sources supported by Beam SDK.

Create PCollection from in-memory data

To create a PCollection from an in-memory Java Collection, you can use the Create transformation provided by Beam. Much like the data adapter Read, you can apply Create directly to the Pipeline object itself.

Create accepts Java Collection and a Coder object as parameters and how elements are encoded in the Collection specified by Coder.

The following sample code shows how to create a PCollection from an in-memory List:

Public static void main (String [] args) {/ / Create a Java Collection, in this case a List of Strings. Static final List LINES = Arrays.asList ("To be, or not to be: that is the question:", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune,"Or to take arms against a sea of troubles,")

/ / Create the pipeline. PipelineOptions options = PipelineOptionsFactory.fromArgs (args). Create (); Pipeline p = Pipeline.create (options)

/ / Apply Create, passing the list and the coder, to create the PCollection. P.apply (Create.of (LINES)) .setCoder (StringUtf8Coder.of ())}

3.2 PCollection characteristics

PCollection is owned by the specific Pipeline object that created it; multiple pipes cannot share the same PCollection. In some ways, PCollection functions like a collection class. However, PCollection differs in several key respects:

Element type

The elements of PCollection can be of any type, but they must all be of the same type. However, to support distributed processing, Beam needs to be able to encode each individual element as a byte string (so the element can be passed to the distributed Worker). Beam SDK provides a data encoding mechanism, including built-in encodings of common types, as well as custom encoding support specified as needed.

Invariance

PCollection is immutable, and once created, individual elements cannot be added, deleted, or changed. The Beam transformation can process each element in the PCollection and generate new pipe data (as a new PCollection) without consuming or modifying the original input set.

Random access

PCollection does not support random access to individual elements. Instead, the Beam transformation can consider each element of PCollection separately.

Size and boundary

PCollection is a large, immutable "package" of elements. There is no limit to the number of elements a PCollection can contain; any given PCollection may be suitable for memory on a single machine, or it can represent a very large distributed dataset of persistent storage.

PCollection can be bounded or × ×. A bounded PCollection represents a known fixed-size dataset, while a × × PCollection represents an infinite-sized dataset. Whether PCollection is bounded or × × depends on the source data set it represents. Reading from bulk data sources, such as files or databases, creates a bounded PCollection. Reading from a stream or continuously updated data source, such as Pub/Sub or Kafka, creates a × × PCollection (unless explicitly told not to).

The bounded (or × ×) nature of PCollection affects how Beam handles its data. You can use batch jobs to process bounded PCollection, read the entire dataset at once, and perform processing in jobs of limited length. Continuously running streaming jobs must be used to process × × PCollection, because the entire collection will not be available for processing at once.

When grouping elements of xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx Beam treats each window as a bundle, and processing continues as the dataset is generated. These logical windows are determined by features associated with data elements, such as timestamps.

Element timestamp

Each element in the PCollection has an internal timestamp associated with it. The timestamp for each element is initially assigned by the PCollection source created. Creating a source of × × PCollection usually assigns a timestamp to each new element when the element is read or added.

Note: PCollection also automatically assigns timestamps to create bounded sources for fixed datasets, but the most common behavior is to assign the same timestamp (Long.MIN_VALUE) to each element.

Timestamps are useful for PCollection that contain inherent time concept elements. If the pipeline is reading a series of events, such as tweets or other social media messages, each element may use the time the event was published as the element timestamp.

You can manually assign a timestamp to an element of PCollection if the source does not assign a timestamp to the element. This is required if the element has an inherent timestamp, but the timestamp is in the structure of the element itself (such as the time field in the server log entry). Beam has a conversion operation that takes PCollection as input and outputs an PCollection; that is exactly the same as the attached timestamp. For more information about how to do this, see assigning a timestamp.

4. Application conversion

In the SDK of Beam, the transformation is an operation in the pipeline. The transformation takes PCollection (or multiple PCollection) as input, performs the specified operation on each element in the collection, and generates a new output PCollection. To invoke the transformation, you must apply it to the input PCollection.

Beam SDK contains many different transformations that can be applied to the PCollection of a pipe. These transformations include common core transformations, such as ParDo or Combine. It also includes prewritten composite transformations included in SDK that combine one or more core transformations into a useful processing pattern, such as counting or combining elements in a collection. You can also define your own more complex composite transformations to accommodate the exact use case of the pipeline.

Each transformation in Beam SDK has a common apply method. Invoking multiple Beam transformations is similar to a method chain, with one difference: apply the transformation to the input PCollection, pass the transformation itself as a parameter, and the operation returns the output PCollection. The general form is as follows:

[Output PCollection] = [Input PCollection] .apply ([Transform])

Because Beam uses the common apply method for PCollection, you can link transformations in turn, or you can apply transformations that contain other nested transformations (called composite transformations in Beam SDK).

How to apply the conversion of the pipeline determines the structure of the pipeline. The best way is to treat the pipeline as a directed acyclic graph, where the node is PCollections and the edge is transformation. For example, you can create sequential pipes in a chained transformation, as follows:

[Final Output PCollection] = [Initial Input PCollection] .apply ([First Transform]) .apply ([Second Transform]) .apply ([Third Transform])

The generation workflow flow chart of the above pipe is as follows: [sequence diagram] [Sequential Graph Graphic]

Note, however, that the transformation does not consume or otherwise change the input set-remember that PCollection is by definition immutable. This means that you can apply multiple transformations to the same input PCollection to create a branch pipe, as follows:

[Output PCollection 1] = [Input PCollection] .apply ([Transform 1]) [Output PCollection 2] = [Input PCollection] .apply ([Transform 2])

The generation workflow flow chart of the above pipeline is as follows: [branch diagram] [Branching Graph Graphic]

You can also build your own composite transformations, with multiple substeps nested in a single, larger transformation. Composite transformations are particularly useful for building simple reusable step sequences that can be used in many different places.

4.1 transformations in BeamSDK

Transformations in Beam SDK provide a general processing framework that provides processing logic in the form of function objects (commonly known as "user code"). The user code applies to the element of the input PCollection. An instance of user code may be executed in parallel by many different Worker in the cluster, depending on the pipe operator and backend that you choose to execute the Beam pipeline. The output elements generated by the user code running on each Worker are eventually added to the final output PCollection generated by the transformation.

4.2 Beam Core conversion

Beam provides the following transformations, each of which represents a different processing paradigm:

L ParDo

L GroupByKey

L Combine

L Flatten and Partition

4.2.1 using ParDo

ParDo is a Beam transformation for general parallel processing. The processing example of ParDo is similar to the "Map" operation in an algorithm in the form of map/shuffle/reduce: an ParDo transformation takes into account each element in the input PCollection, executes some processing functions (user code) on that element, and sends 0, 1 or more elements to the output PCollection.

ParDo can be used for a variety of common data processing operations, including:

L filter the dataset. You can use ParDo to worry about each element in PCollection and output it to a new collection or discard it.

L format or transform each element in the dataset. If the input PCollection contains elements of a different type or format than you want, you can use ParDo to perform the transformation on each element and output the results to the new PCollection.

L extract part of the data for each element in the dataset. If a record in PCollection has multiple fields, for example, you can use ParDo parsing to output the desired field to the new PCollection.

L performs calculations on each element in the dataset. You can use ParDo to perform simple or complex calculations on each element or some deterministic element in PCollection and output the result as a new PCollection.

In such a role, ParDo is a common intermediate step in the pipeline. You can use it to extract fields from a set of original input records, or to convert the original input into a different format, or you can use ParDo to convert the processed data into a format suitable for output, such as database table rows or printable strings.

When you apply a ParDo transform, you need to provide user code in the form of a DoFn object. DoFn is a Beam SDK class that defines distributed processing capabilities.

When creating a subclass of DoFn, note that the subclass should comply with the requirements of Section 4.3 for writing user code for Beam transformations.

Apply ParDo

As with all Beam transformations, applying the ParDo transformation can be passed by calling the apply method on the input PCollection and taking ParDo as a parameter, as shown in the following sample code:

/ / The input PCollection of Strings.PCollection words =.

/ / The DoFn to perform on each element in the input PCollection.static class ComputeWordLengthFn extends DoFn {...}

/ / Apply a ParDo to the PCollection "words" to compute lengths for each word.PCollection wordLengths = words.apply (ParDo.of (new ComputeWordLengthFn (); / / The DoFn to perform on each element, which / / we define above.

In the example, the input PCollection contains a string value. Apply a ParDo transformation, that is, specify a function (ComputeWordLengthFn) to calculate the length of each string and output the result to a new PCollection whose elements store the value of the length of each word for the Integer type.

Create DoFn

The DoFn object passed to ParDo contains processing logic applied to elements in the input collection. When using Beam, the most important pieces of code that are usually written are these DoFn, which define exactly what the pipeline's data processing task is.

Note: when creating a DoFn, pay attention to the requirements for writing Beam conversion user code in Section 4.3, and make sure your code follows them.

DoFn processes one element in the input PCollection at a time. When you create a subclass of DoFn, you need to provide type parameters that match the input and output element types. If DoFn processes elements passed in to type String and generates elements of an output collection of type Integer (such as the previous example ComputeWordLengthFn), the class declaration will look like this:

Static class ComputeWordLengthFn extends DoFn {...}

In a subclass of DoFn, write a method with the @ ProcessElement annotation that provides the actual processing logic. There is no need to manually extract elements from the input collection; Beam SDK can handle it. Annotated as the object type ProcessContext that the @ ProcessElement method should accept. The ProcessContext object allows access to input elements and methods to emit output elements:

Static class ComputeWordLengthFn extends DoFn {@ ProcessElement public void processElement (ProcessContext c) {/ / Get the input element from ProcessContext. String word = c.element (); / / Use ProcessContext.output to emit the output element. C.output (word.length ());}}

Note: if the element in the input PCollection is a key / value pair, you can use ProcessContext.element (). GetKey () or key access key or value ProcessContext.element (). GetValue (), respectively.

A given DoFn instance is usually called one or more times to process some arbitrary set of elements. However, Beam cannot guarantee the exact number of calls; it can be called multiple times on a given worker node to resolve the failure and retry. Therefore, you can cache information to the processing method across multiple calls, but if you do, make sure that the implementation does not depend on the number of calls.

In the processing method, you also need to meet some immutability requirements to ensure that the Beam and the processing back end can safely serialize and cache values in the pipeline. Your method should meet the following requirements:

You should not modify the elements returned by ProcessContext.element () or ProcessContext.sideInput () (or elements passed in from the input collection) in any way.

You should not modify the value in any way after using ProcessContext.output () or ProcessContext.sideOutput () after outputting the value.

Abstractions such as lightweight DoFns

If your functionality is relatively simple, you can use ParDo to simplify your use by providing a lightweight DoFn online content as an anonymous inner class instance.

Here is the previous example, where ParDo uses ComputeLengthWordsFn and DoFn is specified as an instance of an anonymous inner class:

/ / The input PCollection.PCollection words =.

/ / Apply a ParDo with an anonymous DoFn to the PCollection words.// Save the result as the PCollection wordLengths.PCollection wordLengths = words.apply ("ComputeWordLengths", / / the transform name ParDo.of (new DoFn () {/ / a DoFn as an anonymous inner class instance @ ProcessElement public void processElement (ProcessContext c) {c.output (c.element (). Length ());}})

If you ParDo performs an one-to-one mapping of input elements to output elements, that is, for each input element, it applies a function that produces one output element, and you can use more advanced transformations. You can accept an anonymous Java 8 lambda function for further simplification. MapElementsMapElements

The following is an example of using MapElements

/ / The input PCollection.PCollection words =.

/ / Apply a MapElements with an anonymous lambda function to the PCollection words.// Save the result as the PCollection wordLengths.PCollection wordLengths = words.apply (MapElements.into (TypeDescriptors.integers ()) .via ((String word)-> word.length ())

Note: you can use the Java 8 lambda function to convert several other beams, including Filter,FlatMapElements, and Partition.

4.2.2 using GroupByKey

GroupByKey is a beam transform for dealing with sets of key / value pairs. This is a parallel restore operation, similar to the Shuffle phase of the Map / Shuffle / Reduce-style algorithm. The input GroupByKey is a collection of key / value pairs representing multiple mappings, where the set contains multiple pairs with the same key but different values. Given such a collection, you can use GroupByKey to collect all the values associated with each unique key.

GroupByKey is a good way to aggregate data that have something in common. For example, if you have a collection of customer order records, you may need to group all orders from the same zip code (where the key of the key / value pair is the zip code field and the value is the rest of the record).

Let's take a look at the mechanism of the simple example of GroupByKey, where our dataset consists of words in a text file and line numbers that appear. We want to combine all the line numbers (values) that share the same word (key) so that we can see all the places in the text where a particular word appears.

Our input is a PCollection key / value pair, where each word is a key, and the value is the line number where the word appears in the file. The following is a list of key / value pairs in the input collection:

Cat, 1dog, 5and, 1jump, 3tree, 2cat, 5dog, 2and, 2cat, 9and, 6...

GroupByKey collects all values using the same key and outputs a new pair containing a unique key and a collection of all values associated with the keyword in the input set. If we apply GroupByKey to the input set above, the output set will look like this:

Cat, [1,5,9] dog, [5,2] and, [1,2,6] jump, [3] tree, [2]...

Therefore, GroupByKey represents the transformation from multiple mapping (multiple keys to a single value) to a single mapping (unique key to value collection).

Join CoGroupByKey

CoGroupByKey connects two or more PCollection keys / values with the same key type, and then issues a set of KV pairs. Design your pipeline to display sample pipes that use connections.

The following set of inputs is given:

/ / collection 1user1, address1user2, address2user3, address3

/ / collection 2user1, order1user1, order2user2, order3guest, order4...

CoGroupByKey collects values with the same key from all PCollections and outputs a new pair of objects consisting of a unique key and an CoGbkResult containing all the values associated with that key. If you apply to the input set above CoGroupByKey, the output set will look like this:

User1, [[address1], [order1, order2]] user2, [[address2], [order3]] user3, [[address3], []] guest, [[], [order4]]...

Considerations for key / value pairs: it varies slightly depending on the language you use and SDK,Beam represents key / value pairs. In Beam SDK for Java, you can use type objects to represent key / value pairs KV. In Python, you use 2 tuples to represent key / value pairs.

4.2.3 using Combine

Combine is a beam transformation used to combine elements or sets of values in data. Combine has variants that work on the entire PCollections, and some combine the value of each key in the PCollection key / value pair.

When you apply a Combine transform, you must provide a function that contains logic for combining elements or values. The combined function should be interchangeable and associative, because the function may not be called correctly on all values of a given key. Because input data, including a set of values, can be distributed among multiple workers, the combination function can be called multiple times to perform a partial combination on a subset of the set of values. Beam SDK also provides some pre-built combination capabilities for constant numeric combination operations, such as sum,min and max.

Simple combined operations (such as and) can usually be implemented as a simple function. A more complex combination operation may require you to create a subclass of CombineFn that has a different cumulative type than the input / output type.

Use a simple combination of simple functions

The following sample code shows a simple combined function.

/ / Sum a collection of Integer values. The function SumInts implements the interface SerializableFunction.public static class SumInts implements SerializableFunction {@ Override public Integer apply (Iterable input) {int sum = 0; for (int item: input) {sum + = item;} return sum;}} uses the advanced combination of CombineFn

For more complex combinational functions, you can define a subclass CombineFn. You should use the CombineFn combination feature to require more complex accumulators, must perform additional preprocessing or post-processing, and may change the output type or take the key into account.

A general combined operation consists of four operations. When creating a subclass, CombineFn must provide four actions by overriding the corresponding methods:

1. Create accumulator create a new "local" accumulator. In the example case, taking the average, the local accumulator tracks the running value (the numerator value of our final average division) and the sum value (denominator value) so far. It can be called any number of times in a distributed way.

2. Add Input adds an input element to the accumulator and returns the accumulator value. In our example, it updates the sum and increases the count. You can also call it in parallel.

3. Merge accumulators merge multiple accumulators into a single accumulator; this is how to combine data from multiple accumulators before final calculation. In the case of average calculation, accumulators representing each part of the partition are merged. Its output may be called multiple times again.

4. Extract the output to perform the final calculation. In the case of calculating the average, this means that the combination of all values is divided by the number of summation. The accumulator in the final merge is called once.

The following sample code shows how to define an CombineFn calculated average:

Public class AverageFn extends CombineFn {public static class Accum {int sum = 0; int count = 0;}

@ Override public Accum createAccumulator () {return new Accum ();}

@ Override public Accum addInput (Accum accum, Integer input) {accum.sum + = input; accum.count++; return accum;}

@ Override public Accum mergeAccumulators (Iterable accums) {Accum merged = createAccumulator (); for (Accum accum: accums) {merged.sum + = accum.sum; merged.count + = accum.count;} return merged;}

@ Override public Double extractOutput (Accum accum) {return ((double) accum.sum) / accum.count;}}

If you are combining PCollection key-value pairs, merging per key is usually sufficient. If you need to change the combination policy based on the key (for example, the MIN of some users and the MIN of others), KeyedCombineFn can define an access key in the combination policy.

Combine PCollection into a single value

Use a global combination to convert all elements in a given PCollection to a single value, represented in your pipeline as a new PCollection containing an element. The following sample code shows how to use the sum combine function provided by Beam to produce a sum value for an PCollection integer.

/ / Sum.SumIntegerFn () combines the elements in the input PCollection.// The resulting PCollection, called sum, contains one value: the sum of all the elements in the input PCollection.PCollection pc =...; PCollection sum = pc.apply (Combine.globally (new Sum.SumIntegerFn (); Global window:

If your input PCollection uses the default global window, the default behavior is to return PCollection containing a project. The value of this item comes from the accumulator Combine in the merge function specified at the time of application. For example, the provided sum composite function returns a zero value (the sum of null inputs), while the min composite function returns a maximum or infinite value.

There is Combine instead of returning an empty PCollection when the input is empty, specify .withoutDefaults when you apply for your Combine transformation, as in the following code example:

PCollection pc =...; PCollection sum = pc.apply (Combine.globally (new Sum.SumIntegerFn ()). WithoutDefaults ()); non-global window:

If you PCollection uses any non-global window functions, Beam does not provide default behavior. When applying, you must specify one of the following options, Combine:

Specify that the window that is empty in the input in the. WithoutDefaults input PCollection collection is also empty.

Specify .asSingletonView, where the output is immediately converted to a PCollectionView, and when input for an edge, a default value is provided for each empty window. In general, if you use the resulting Combine of the pipe as subsequent edge input, you usually only need to use this option.

Values in the grouping set of combined keys

After creating a collection of key groups (for example, by using GroupByKey transformations), the common pattern is to combine a collection of values associated with each key into a single merged value. According to the previous example GroupByKey, a key grouped PCollection call groupedWords looks like this:

Cat, [1,5,9] dog, [5,2] and, [1,2,6] jump, [3] tree, [2]...

In the PCollection above, each element has a string key (for example, "cat") and an iterable integer (including [1 ~ 5 ~ 9] in the first element). If the next processing step of our pipeline combines these values (rather than considering them separately), we can combine iterations of integers to create a single merge value paired with each key. In this mode, GroupByKey then merges a collection of values equivalent to the Combine PerKey transformation of Beam. The combination function provided by Combine PerKey must be an associated reduction function or a subclass CombineFn.

/ / PCollection is grouped by key and the Double values associated with each key are combined into a Double.PCollection salesRecords =...; PCollection totalSalesPerPerson = salesRecords.apply (Combine.perKey (new Sum.SumDoubleFn ()

/ / The combined value is of a different type than the original collection of values per key.// PCollection has keys of type String and values of type Integer, and the combined value is a Double.

PCollection playerAccuracy =...; PCollection avgAccuracyPerPlayer = playerAccuracy.apply (Combine.perKey (new MeanInts (); 4.2.4 using Flatten and Partition

Flatten is also an Beam transform that stores objects of the same data type. Merge multiple objects into a single logic, and split a single into a fixed number of smaller sets. PartitionPCollectionFlattenPCollectionPCollectionPartitionPCollection

Flatten out

The following example shows how to apply the Flatten transformation to merge multiple PCollection objects.

/ / Flatten takes a PCollectionList of PCollection objects of a given type.// Returns a single PCollection that contains all of the elements in the PCollection objects in that list.PCollection pc1 =..; PCollection pc2 =...; PCollection pc3 =...; PCollectionList collections = PCollectionList.of (pc1) .and (pc2) .and (pc3)

PCollection merged = collections.apply (Flatten.pCollections ()); merge the data encoding in the collection:

By default, the output encoder PCollection is the same encoder used to input PCollectionList in the first PCollection. However, input PCollection objects can use different encoders, as long as they all contain the same data type in the language of your choice.

Merge window collections:

When using Flatten to merge PCollection PCollection objects that have applied window policies, all objects to be merged must use compatible window policies and window sizes. For example, all collections you merge must use (suppose) the same 5-minute fixed window or start a 4-minute sliding window every 30 seconds.

If your pipe tries to merge PCollection objects using an incompatible window Flatten, IllegalStateException Beam generates an error when your pipe is built.

Division

PartitionPCollection divides an element according to the partitioning functionality you provide. The partitioning function contains a logical PCollection that determines how to split the input element PCollection into each generated partition. The number of partitions must be determined when the drawing is built. For example, you can pass the number of partitions as a command line option at run time (and then used to build a pipeline), but you cannot determine the number of partitions in the intermediate pipe (based on later calculated data) for example, your pipeline is built.

The following example divides a PCollection into percentile groups.

/ / Provide an int value with the desired number of result partitions, and a PartitionFn that represents the partitioning function.// In this example, we define the PartitionFn in-line.// Returns a PCollectionList containing each of the resulting partitions as individual PCollection objects.PCollection students =. / / Split students up into 10 partitions, by percentile:PCollectionList studentsByPercentile = students.apply (Partition.of (10, new PartitionFn () {public int partitionFor (Student student, int numPartitions) {return student.getPercentile () / / 0.99 * numPartitions / 100;})

/ / You can extract each partition from the PCollectionList using the get method, as follows:PCollection fortiethPercentile = studentsByPercentile.get (4); 4.3 General requirements for writing Beam conversion user code 4.4 Side Inputs4.5 additional output 5. Composite conversion 6. Pipeline I/O7. Data encoding and type safety 8. Use window 9. Use triggers

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report