In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
In this issue, the editor will bring you the deployer module about how to use the source code to analyze canal. The article is rich in content and analyzes and narrates it from a professional point of view. I hope you can get something after reading this article.
CanalLauncher is the startup entry class
Get canal.properties profile
If the attribute root.admin.manager in the canal.properties configuration file has a value, then construct PlainCanalConfigClient, call PlainCanalConfigClient's findServer to get PlainCanal, and call PlainCanal's getProperties method to get properties
Construct CanalStarter through properties and call its start method
CanalStarter is the startup class
Public synchronized void start () throws Throwable {/ / first constructs CanalMQProducer based on canal.serverMode. If it is kafka, it constructs CanalKafkaProducer String serverMode = CanalController.getProperty (properties, CanalConstants.CANAL_SERVER_MODE); if (serverMode.equalsIgnoreCase ("kafka")) {canalMQProducer = new CanalKafkaProducer ();} else if (serverMode.equalsIgnoreCase ("rocketmq")) {canalMQProducer = new CanalRocketMQProducer () } if (canalMQProducer! = null) {/ / disable netty System.setProperty (CanalConstants.CANAL_WITHOUT_NETTY, "true"); / / set to raw to avoid secondary resolution of ByteString- > Entry System.setProperty ("canal.instance.memory.rawEntry", "false") } / / next construct CanalController and call its start method logger.info ("# # start the canal server."); controller = new CanalController (properties); controller.start (); logger.info ("# # the canal server is running now.") . / / construct CanalMQStarter and call its start method, and set the property if (canalMQProducer! = null) {canalMQStarter = new CanalMQStarter (canalMQProducer); MQProperties mqProperties = buildMQProperties (properties); String destinations = CanalController.getProperty (properties, CanalConstants.CANAL_DESTINATIONS); canalMQStarter.start (mqProperties, destinations); controller.setCanalMQStarter (canalMQStarter) to CanalController. }... Running = true;}
CanalController is the instance scheduling controller
Public CanalController (final Properties properties) {/ / initialize managerClients to request admin managerClients = MigrateMap.makeComputingMap (new Function () {public PlainCanalConfigClient apply (String managerAddress) {return getManagerClient (managerAddress);}}) / / initialize global parameter settings, including global mode, lazy, managerAddress, and springXml. Initialize instanceGenerator is used to create instance, which uses PlainCanalInstanceGenerator or SpringCanalInstanceGenerator to create CanalInstance globalInstanceConfig = initGlobalConfig (properties) based on the mode value of InstanceConfig; instanceConfigs = new MapMaker (). MakeMap (); / initialize instance config, including instances mode, lazy, managerAddress, springXml initInstanceConfig (properties) Initialize CanalServerWithEmbedded, set instanceGenerator to the property embededCanalServer = CanalServerWithEmbedded.instance (); embededCanalServer.setCanalInstanceGenerator (instanceGenerator) of CanalServerWithEmbedded; / / set custom instanceGenerator int metricsPort = Integer.valueOf (getProperty (properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112")); embededCanalServer.setMetricsPort (metricsPort); this.adminUser = getProperty (properties, CanalConstants.CANAL_ADMIN_USER) This.adminPasswd = getProperty (properties, CanalConstants.CANAL_ADMIN_PASSWD); embededCanalServer.setUser (getProperty (properties, CanalConstants.CANAL_USER)); embededCanalServer.setPasswd (getProperty (properties, CanalConstants.CANAL_PASSWD)); Final String zkServers = getProperty (properties, CanalConstants.CANAL_ZKSERVERS); / / initialize ZkClientx for canal cluster deployment, create / otteradmin/canal/destinations node and / otteradmin/canal/cluster node if (StringUtils.isNotEmpty (zkServers)) {zkclientx = ZkClientx.getZkClient (zkServers); / / initialize the system directory zkclientx.createPersistent (ZookeeperPathUtils.DESTINATION_ROOT_NODE, true) Zkclientx.createPersistent (ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);} / initialize the ServerRunningMonitor of ServerRunningMonitors, which is used to start and shut down the instance final ServerRunningData serverData = new ServerRunningData (registerIp + ":" + port); ServerRunningMonitors.setServerData (serverData); ServerRunningMonitors.setRunningMonitors (MigrateMap.makeComputingMap (new Function () {...}) / / initialize InstanceAction, which is used to start and close instances autoScan = BooleanUtils.toBoolean (getProperty (properties, CanalConstants.CANAL_AUTO_SCAN)); if (autoScan) {defaultAction = new InstanceAction () {...} / / initialize instanceConfigMonitors to get all instanceConfig and start all instance instanceConfigMonitors = MigrateMap.makeComputingMap (new Function () {public InstanceConfigMonitor apply (InstanceMode mode) {...}});}}
ManagerInstanceConfigMonitor is an instance scanner
Public void start () {super.start (); / / start a scheduled task to scan all instance executor.scheduleWithFixedDelay (new Runnable () {public void run () {try {scan (); if (isFirst) {isFirst = false) regularly }} catch (Throwable e) {logger.error ("scan failed", e);}, 0, scanIntervalInSecond, TimeUnit.SECONDS) } private void scan () {/ / caches all instance configurations. If a new instance is found and instance is started or modified, restart String instances = configClient.findInstances (null); final List is = Lists.newArrayList (StringUtils.split (instances,',')); List start = Lists.newArrayList (); List stop = Lists.newArrayList (); List restart = Lists.newArrayList () For (String instance: is) {if (! configs.containsKey (instance)) {PlainCanal newPlainCanal = configClient.findInstance (instance, null); if (newPlainCanal! = null) {configs.put (instance, newPlainCanal); start.add (instance) }} else {PlainCanal plainCanal = configs.get (instance); PlainCanal newPlainCanal = configClient.findInstance (instance, plainCanal.getMd5 ()); if (newPlainCanal! = null) {/ / configuration changes restart.add (instance); configs.put (instance, newPlainCanal) } configs.forEach ((instance, plainCanal)-> {if (! is.contains (instance)) {stop.add (instance);}}); stop.forEach (instance-> {notifyStop (instance);}) Restart.forEach (instance-> {notifyReload (instance);}); start.forEach (instance-> {notifyStart (instance);});} private void notifyStart (String destination) {try {/ / start instance call InstanceAction to start the instance, and finally call ServerRunningMonitor to start the instance defaultAction.start (destination) Actions.put (destination, defaultAction); / / record profile information after successful startup} catch (Throwable e) {logger.error (String.format ("scan add found [% s] but start failed", destination), e);}}
ServerRunningMonitor is the running instance control for server
Public ServerRunningMonitor () {/ / create parent node dataListener = new IZkDataListener () {public void handleDataChange (String dataPath, Object data) throws Exception {MDC.put ("destination", destination); ServerRunningData runningData = JsonUtils.unmarshalFromByte ((byte []) data, ServerRunningData.class) If (! isMine (runningData.getAddress () {mutex.set (false);} if (! runningData.isActive () & & isMine (runningData.getAddress () {/ / indicates that an active release operation has occurred, and the machine was previously active releaseRunning () / / completely release mainstem} activeData = (ServerRunningData) runningData;} public void handleDataDeleted (String dataPath) throws Exception {MDC.put ("destination", destination); mutex.set (false) If (! release & & activeData! = null & & isMine (activeData.getAddress () {/ / if the status of the last active is native, immediately trigger active to preempt initRunning () } else {/ / otherwise wait for delayTime to avoid frequent switching operation delayExector.schedule (new Runnable () {public void run () {initRunning ()) due to network instant or zk exception. }, delayTime, TimeUnit.SECONDS);};} public synchronized void start () {super.start (); try {/ / first calls listener's processStart method processStart () If (zkClient! = null) {/ / if you need to release instance resources as much as possible and do not need to listen to the running node, otherwise, even if the machine is stop, the other machine will immediately start / / monitor / otteradmin/canal/destinations/ {0} / running node changes String path = ZookeeperPathUtils.getDestinationServerRunning (destination); zkClient.subscribeDataChanges (path, dataListener) InitRunning ();} else {processActiveEnter (); / / without zk, directly launch}} catch (Exception e) {logger.error ("start failed", e); / / do not start normally, reset the state to avoid interfering with the next start stop () }} private void processStart () {if (listener! = null) {try {/ / processStart method creates a / otteradmin/canal/destinations/ {0} / cluster/ {1} node, where 0 is the instance name and 1 is the current node ip:port listener.processStart () } catch (Exception e) {logger.error ("processStart failed", e);} private void initRunning () {if (! isStart ()) {return;} String path = ZookeeperPathUtils.getDestinationServerRunning (destination); / / serialize byte [] bytes = JsonUtils.marshalToByte (serverData) Try {mutex.set (false); / / attempt to create / otteradmin/canal/destinations/ {0} / running node zkClient.create (path, bytes, CreateMode.EPHEMERAL); activeData = serverData; / / call listener's processEnter method if successful, and call CanalServerWithEmbedded's start method in the processEnter method to start the instance and CanalMQStarter's start method to start the instance processActiveEnter () / / trigger the event mutex.set (true); release = false;} catch (ZkNodeExistsException e) {bytes = zkClient.readData (path, true); if (bytes = = null) {/ / if no node exists, try initRunning () immediately } else {activeData = JsonUtils.unmarshalFromByte (bytes, ServerRunningData.class);}} catch (ZkNoNodeException e) {zkClient.createPersistent (ZookeeperPathUtils.getDestinationPath (destination), true); / / attempt to create a parent node initRunning ();}}
Canal.properties configuration
Canal.register.ip = canal.admin.manager = 127.0.0.1:8089canal.admin.port = 11110canal.admin.user = admincanal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441canal.admin.register.auto = truecanal.admin.register.cluster = the above is the deployer module shared by Xiaobian on how to analyze canal with source code. If you happen to have similar doubts, please refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.