Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What are the problems in using flink?

2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces the problems of the use of flink, which has a certain reference value, interested friends can refer to, I hope you can learn a lot after reading this article, let the editor take you to understand it.

Do not use result when registering

TableEnv.registerTable ("result_agg", talbe)

As above, if you write as

TableEnv.registerTable ("result", talbe)

Then the following error will be reported

Org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL parse failed. Encountered "from result" at line 1, column 13.Was expecting one of: "ORDER"... "LIMIT"... "OFFSET"... "FETCH"... "FROM"... "FROM"... "FROM"... "FROM"... "FROM"... "FROM"LATERAL"... "FROM" ("... "FROM"UNNEST"... "FROM"TABLE"... "AS"...

If the mysql type is tinyint, the following should be transferred, otherwise flink will report an error

Caused by: java.lang.ClassCastException: java.lang.Boolean cannot be cast to java.lang.String at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize (StringSerializer.java:33) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize (RowSerializer.java:161)

The way to turn is very simple, such as

Select project_fid, cast (project_info_type as CHAR) as type from table

If you have data of type map on one side of join (for example, you use the collect method, which is similar to mysql's group_concat), a null pointer is returned, similar to

Org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Error while applying rule FlinkLogicalJoinConverter, args [rel#56:LogicalJoin.NONE (left=rel#52:Subset#0.NONE,right=rel#55:Subset#2.NONE,condition== ($0, $2), joinType=inner)]

This bug has not been fixed yet. Connect to the following https://issues.apache.org/jira/browse/FLINK-11433

We can only find a way to turn the data of type map into a string by customizing the function.

Public class MapToString extends ScalarFunction {public String eval (Map map) {if (map==null | | map.size () = = 0) {return ";} StringBuffer sb=new StringBuffer (); for (Map.Entry entity: map.entrySet ()) {sb.append (entity.getKey () +", ");} String result=sb.toString () Return result.substring (0, result.length ()-1);}}

Use when calling

Select id, mapToString (collect (type)) as type from table group by id

Of course, you also need to register.

TableEnv.registerFunction ("mapToString", new MapToString ())

Type conversion error

Recently, I always encounter the hint of type conversion error. So far, I have found two. Make a record.

An if it is tiny (1) will be automatically converted to boolean, in addition to the above solution, it is more elegant to modify the connection of mysql, plus the parameter tinyInt1isBit=false, pay attention to case

B sometimes the id field of the mysql database is clearly int, but flink identifies it as long. It seems that mybatis has had this problem before (https://blog.csdn.net/ahwsk/article/details/81975117).

Later, I carefully looked at the watch design (other people's watches) and found that the option "unsigned" was checked. When I removed this option and ran it again, I did not report an error, which seemed to be unsigned, allowing flink to convert into an error. Unsigned has one more bit in scope than signed, and the extra one may have exceeded the range of int in java (int in java is all signed). So it automatically changes to long.

Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory'

Although fatjar already has a corresponding class, it still reports an error. The final solution is to add the relevant class to flink's lib directory again to solve the problem.

Cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

This error requires adding classloader.resolve-order: parent-first in flink-conf.yaml

Caused by: java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp: / / flink@flink88:15265/user/taskmanager_0#66653408]] after [10000 ms]. Sender [null] sent message of type "org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation".

This error occurs and is added to the configuration of flink

Akka.ask.timeout: 120sweb.timeout: 120000

8.Exception while invoking ApplicationClientProtocolPBClientImpl.forceKillApplication over null. Retrying after sleeping for 30000ms

This error occurred when submitting the task. Use yarn logs-applicationId application_1565600987111 to check the error and find out the reason. The reason I encountered is that the akka.watch.heartbeat.pause value is less than akka.watch.heartbeat.interval. The error disappears after modification

Or the process of kill dropping CliFrontend

Exception while invoking ApplicationClientProtocolPBClientImpl.forceKillApplication over null. Retrying after sleeping for 30000ms.java.io.IOException: The client is stoppedat org.apache.hadoop.ipc.Client.getConnection (Client.java:1519) at org.apache.hadoop.ipc.Client.call (Client.java:1381) at org.apache.hadoop.ipc.Client.call (Client.java:1345) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke (ProtobufRpcEngine.java:227) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke (ProtobufRpcEngine.java:116)

This exception is still a memory problem. Check whether the memory is enough. It must be free, not available. If the latter is found to be very high, please execute the following two commands to release memory.

Syncecho 3 > / proc/sys/vm/drop_caches

ten

Caused by: java.net.BindException: Could not start rest endpoint on any port in port range 8081

At org.apache.flink.runtime.rest.RestServerEndpoint.start (RestServerEndpoint.java:219)

At org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create (AbstractDispatcherResourceManagerComponentFactory.java:161)

... 9 more

This error means that the port is occupied. View the source code:

Iterator portsIterator; try {portsIterator = NetUtils.getPortRangeFromString (restBindPortRange);} catch (IllegalConfigurationException e) {throw e;} catch (Exception e) {throw new IllegalArgumentException ("Invalid port range definition:" + restBindPortRange);}

The corresponding configuration is rest.bind-port in flink-conf.yaml.

If rest.bind-port is not set, Rest Server is bound to the rest.port port (8081) by default.

Rest.bind-port can be set to a list format such as 50100Power50101, or a range format such as 50100-50200. Recommended range format to avoid port conflicts.

Thank you for reading this article carefully. I hope the article "what are the problems with the use of flink" shared by the editor will be helpful to you? at the same time, I also hope that you will support us and pay attention to the industry information channel. More related knowledge is waiting for you to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report