{"id":571,"date":"2020-06-02T14:01:58","date_gmt":"2020-06-02T06:01:58","guid":{"rendered":"http:\/\/www.guanhaobo.cn\/?p=571"},"modified":"2020-06-02T14:01:58","modified_gmt":"2020-06-02T06:01:58","slug":"%e5%a4%a7%e6%95%b0%e6%8d%ae%e6%8a%80%e6%9c%af%e5%ae%9e%e9%aa%8c%e5%9b%9b-kafka%e6%95%b0%e6%8d%ae%e9%87%87%e9%9b%86%e5%ae%9e%e9%aa%8c","status":"publish","type":"post","link":"https:\/\/www.guanhaobo.cn\/?p=571","title":{"rendered":"\u5927\u6570\u636e\u6280\u672f\u5b9e\u9a8c\u56db \u2014\u2014 Kafka\u6570\u636e\u91c7\u96c6\u5b9e\u9a8c"},"content":{"rendered":"<h3>1 \u5b9e\u9a8c\u76ee\u7684<\/h3>\n<ol>\n<li>\u4e86\u89e3Kafka\u4e2d\u7684\u751f\u4ea7\u8005\u3001\u6d88\u8d39\u8005\u3001topic\u7b49\u57fa\u672c\u6982\u5ff5\uff1b<\/li>\n<li>\u719f\u6089Kafka Shell\u5e38\u7528\u547d\u4ee4\uff1b<\/li>\n<li>\u5b66\u4e60\u4f7f\u7528Kafka\u7684Java API\uff0c\u7f16\u7a0b\u5b9e\u73b0Kafka\u5e38\u7528\u529f\u80fd\uff1b<\/li>\n<\/ol>\n<h3>2 \u5b9e\u9a8c\u73af\u5883<\/h3>\n<p>\u5b9e\u9a8c\u5e73\u53f0\uff1a\u57fa\u4e8e\u5b9e\u9a8c\u4e00\u642d\u5efa\u7684\u865a\u62df\u673aHadoop\u5927\u6570\u636e\u5b9e\u9a8c\u5e73\u53f0\u4e0a\u7684Kafka\u96c6\u7fa4\uff1b<br \/>\n\u7f16\u7a0b\u8bed\u8a00\uff1aJAVA\uff08\u63a8\u8350\u4f7f\u7528\uff09\u3001Python\u3001C++\u7b49\uff1b<\/p>\n<h3>3 \u5b9e\u9a8c\u5185\u5bb9<\/h3>\n<ol>\n<li>\u4f7f\u7528Kafka Shell\u547d\u4ee4\u5b8c\u6210\u4ee5\u4e0b\u4efb\u52a1\uff1a<br \/>\n(1). \u521b\u5efa\u4efb\u610ftopic<br \/>\n(2). \u521b\u5efa\u5411\u8be5topic\u53d1\u9001\u6570\u636e\u7684\u751f\u4ea7\u8005<br \/>\n(3). \u521b\u5efa\u8ba2\u9605\u8be5topic\u7684\u6d88\u8d39\u8005<\/li>\n<li>\u4f7f\u7528Java API\u7f16\u7a0b\u5b9e\u73b0\u4ee5\u4e0b\u4efb\u52a1\uff1a<br \/>\n(1). \u5b9e\u73b0\u751f\u4ea7\u8005\u7a0b\u5e8f\uff0c\u5411\u6307\u5b9atopic\u53d1\u9001\u6570\u636e<br \/>\n(2). \u5b9e\u73b0\u6d88\u8d39\u8005\u7a0b\u5e8f\uff0c\u4ece\uff081\uff09\u4e2d\u6307\u5b9a\u7684topic\u4e2d\u8ba2\u9605\u6570\u636e\u5e76\u5c06\u6d88\u8d39\u5f97\u5230\u7684\u6570\u636e\u5b58\u5230\u672c\u5730\u6587\u4ef6\u4e2d\u3002<\/li>\n<li>\u5bf9\u4ee5\u4e0a\u4e24\u90e8\u5206\u4efb\u52a1\u64b0\u5199\u5b9e\u9a8c\u62a5\u544a\uff0c\u5e76\u63d0\u4ea4\u5b9e\u9a8c\u4ee3\u7801\u3002<\/li>\n<\/ol>\n<h3>4 \u51c6\u5907\u5de5\u4f5c<\/h3>\n<pre><code class=\"language-cpp line-numbers\">\/\/\u4e09\u53f0\u90fd\u8981\u6267\u884c\n\/\/\u5207\u6362\u5230\u7528\u6237hadoop\nsu hadoop\n\/\/\u542f\u52a8zookeeper\nzkServer.sh start\n\/\/\u542f\u52a8kafka\nkafka-server-start.sh \/usr\/local\/kafka_2.10-0.8.2.1\/config\/server.properties &amp;\n<\/code><\/pre>\n<h3>5 Kafka Shell\u547d\u4ee4<\/h3>\n<h4>5.1 \u521b\u5efa\u4efb\u610ftopic<\/h4>\n<pre><code class=\"language-cpp line-numbers\">kafka-topics.sh --create --zookeeper cluster1:2181,cluster2:2181,cluster3:2181 --replication-factor 3 --partitions 1 --topic ghb123\n<\/code><\/pre>\n<p><img decoding=\"async\" src=\"http:\/\/www.guanhaobo.cn\/wp-content\/uploads\/2020\/06\/2001.jpg\" alt=\"\" \/><\/p>\n<h4>5.2 \u521b\u5efa\u5411\u8be5topic\u53d1\u9001\u6570\u636e\u7684\u751f\u4ea7\u8005<\/h4>\n<p>\u5728cluster1\u4e0a\u6267\u884c<\/p>\n<pre><code class=\"language-cpp line-numbers\">kafka-console-producer.sh --broker-list cluster1:9092 --topic ghb123\n<\/code><\/pre>\n<p>\u6ce8\u610f\uff0c\u8fd9\u91cc\u7684<code>cluster1:9092<\/code>\u5982\u679c\u5199\u6210<code>localhost:9092<\/code>\u53ef\u80fd\u4f1a\u51fa\u73b0\u9519\u8bef\uff0c\u9664\u975e\u4e8b\u5148\u5c06<code>localhost<\/code>\u7684ip\u6307\u5411\u4fee\u6539\u4e3a\u4e86\u5f53\u524dip\u3002<\/p>\n<h4>5.3 \u521b\u5efa\u8ba2\u9605\u8be5topic\u7684\u6d88\u8d39\u8005<\/h4>\n<p>\u5728cluster2\u4e0a\u6267\u884c<\/p>\n<pre><code class=\"language-cpp line-numbers\">kafka-console-consumer.sh -zookeeper cluster1:2181,cluster2:2181,cluster3:2181 --topic ghb123 --from-beginning\n<\/code><\/pre>\n<h4>5.4 \u9a8c\u8bc1\u6548\u679c<\/h4>\n<p>\u5728cluster1\u4e2d\u8f93\u5165\u5185\u5bb9\uff0c\u53ef\u4ee5\u5728cluster2\u4e2d\u663e\u793a\u51fa\u6765\u3002<br \/>\n<img decoding=\"async\" src=\"http:\/\/www.guanhaobo.cn\/wp-content\/uploads\/2020\/06\/2002.jpg\" alt=\"\" \/><\/p>\n<h3>6 Java API<\/h3>\n<h4>6.1 \u5b9e\u73b0\u751f\u4ea7\u8005\u7a0b\u5e8f\uff0c\u5411\u6307\u5b9atopic\u53d1\u9001\u6570\u636e<\/h4>\n<p>\u7f16\u5199\u7a0b\u5e8fghbProducer.java<\/p>\n<pre><code class=\"language-java line-numbers\">import java.io.BufferedReader;\nimport java.io.File;\nimport java.io.FileReader;\nimport java.io.IOException;\nimport java.util.Properties;\nimport java.util.Scanner;\nimport kafka.javaapi.producer.Producer;\nimport kafka.producer.KeyedMessage;\nimport kafka.producer.ProducerConfig;\n\npublic class ghbProducer {\n\n    public static void main(String[] args) {\n        Scanner in = new Scanner(System.in);\n        Properties props = new Properties();\n        props.put(\"serializer.class\", \"kafka.serializer.StringEncoder\");\n        props.put(\"metadata.broker.list\", \"cluster1:9092\");\n        Producer&lt;Integer, String&gt; producer = new Producer&lt;Integer, String&gt;(new ProducerConfig(props));\n        String topic;\n        System.out.print(\"\u8bf7\u8f93\u5165topic\u540d\u79f0\uff1a\");\n        topic = in.next();\n        File file = new File(\"kafka\u91c7\u96c6\u6570\u636e\u5b9e\u9a8c.txt\");\n        BufferedReader reader = null;\n        System.out.print(\"\u8bf7\u8f93\u5165\u53d1\u9001\u6570\u636e\u884c\u6570\uff1a\");\n        int num = in.nextInt();\n        try {\n            reader = new BufferedReader(new FileReader(file));\n            String tempString = null;\n            int line = 1;\n            while ((tempString = reader.readLine()) != null) {\n                producer.send(new KeyedMessage&lt;Integer, String&gt;(topic, tempString));\n                System.out.println(\"\u6210\u529f\u53d1\u9001\u7b2c \" + line + \" \u884c\u6570\u636e...\");\n                if (line == num)\n                    break;\n                line++;\n            }\n            reader.close();\n        } catch (Exception e) {\n            e.printStackTrace();\n        } finally {\n            if (reader != null) {\n                try {\n                    reader.close();\n                } catch (IOException e1) {\n                }\n            }\n        }\n        producer.close();\n    }\n}\n<\/code><\/pre>\n<p>\u6253\u5f00WinSCP\uff0c\u5c06ghbProducer.java\u53ca\u5b9e\u9a8c\u6570\u636e\u590d\u5236\u5230\u865a\u62df\u673acluster1\u7684\/home\/hadoop\u8def\u5f84\u4e0b\u3002<\/p>\n<pre><code class=\"language-php line-numbers\">\/\/ \u7f16\u8bd1\njavac -cp \/usr\/local\/kafka_2.10-0.8.2.1\/libs\/*: ghbProducer.java\n\/\/ \u8fd0\u884c\njava -cp \/usr\/local\/kafka_2.10-0.8.2.1\/libs\/*: ghbProducer\n<\/code><\/pre>\n<p><img decoding=\"async\" src=\"http:\/\/www.guanhaobo.cn\/wp-content\/uploads\/2020\/06\/2004.jpg\" alt=\"\" \/><\/p>\n<p>\u8f93\u5165topic\u540d\u79f0\u4ee5\u53ca\u53d1\u9001\u7684\u884c\u6570\u3002<\/p>\n<p><img decoding=\"async\" src=\"http:\/\/www.guanhaobo.cn\/wp-content\/uploads\/2020\/06\/2005.jpg\" alt=\"\" \/><\/p>\n<h4>6.2 \u5b9e\u73b0\u6d88\u8d39\u8005\u7a0b\u5e8f\uff0c\u4ece\u6307\u5b9a\u7684topic\u4e2d\u8ba2\u9605\u6570\u636e\u5e76\u5c06\u6d88\u8d39\u5f97\u5230\u7684\u6570\u636e\u5b58\u5230\u672c\u5730\u6587\u4ef6\u4e2d\u3002<\/h4>\n<p>\u7f16\u5199\u7a0b\u5e8fghbConsumer.java<\/p>\n<pre><code class=\"language-java line-numbers\">import java.util.HashMap;\nimport java.util.List;\nimport java.util.Map;\nimport java.util.Properties;\nimport java.io.BufferedWriter;\nimport java.io.File;\nimport java.io.FileNotFoundException;\nimport java.io.FileReader;\nimport java.io.FileWriter;\nimport java.util.Scanner;\nimport kafka.consumer.Consumer;\nimport kafka.consumer.ConsumerConfig;\nimport kafka.consumer.ConsumerIterator;\nimport kafka.consumer.KafkaStream;\nimport kafka.javaapi.consumer.ConsumerConnector;\n\npublic class ghbConsumer {\n\n    public static void main(String[] args) {\n        try {\n            Scanner in = new Scanner(System.in);\n            String topic, path;\n            System.out.print(\"\u8bf7\u8f93\u5165topic\u540d\u79f0\uff1a\");\n            topic = in.next();\n            System.out.print(\"\u4fdd\u5b58\u81f3\u6587\u4ef6\uff1a\");\n            path = in.next();\n            ConsumerConnector consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());\n            Map&lt;String, Integer&gt; topicCountMap = new HashMap&lt;String, Integer&gt;();\n            topicCountMap.put(topic, new Integer(1));\n            Map&lt;String, List&lt;KafkaStream&lt;byte[], byte[]&gt;&gt;&gt; consumerMap = consumer.createMessageStreams(topicCountMap);\n            KafkaStream&lt;byte[], byte[]&gt; stream = consumerMap.get(topic).get(0);\n            ConsumerIterator&lt;byte[], byte[]&gt; it = stream.iterator();\n            int i = 1;\n            while (it.hasNext()) {\n                BufferedWriter bw = new BufferedWriter(new FileWriter(path, true));\/\/ \u8ffd\u52a0\n                String out = new String(it.next().message());\n                bw.write(out + '\\n');\n                System.out.println(\"\u6d88\u8d39\u7b2c\" + i + \"\u884c\u6570\u636e    \" + out);\n                i++;\n                bw.close();\n            }\n        } catch (Exception e) {\n            \/\/ TODO Auto-generated catch block\n            e.printStackTrace();\n        }\n    }\n\n    private static ConsumerConfig createConsumerConfig() {\n        Properties props = new Properties();\n        props.put(\"group.id\", \"group1\");\n        props.put(\"zookeeper.connect\", \"cluster1:2181,cluster2:2181,cluster3:2181\");\n        props.put(\"zookeeper.session.timeout.ms\", \"400\");\n        props.put(\"zookeeper.sync.time.ms\", \"200\");\n        props.put(\"auto.commit.interval.ms\", \"1000\");\n        return new ConsumerConfig(props);\n    }\n}\n<\/code><\/pre>\n<p>\u6253\u5f00WinSCP\uff0c\u5c06ghbConsumer.java\u590d\u5236\u5230\u865a\u62df\u673acluster2\u7684\/home\/hadoop\u8def\u5f84\u4e0b\u3002<\/p>\n<pre><code class=\"language-php line-numbers\">\/\/ \u7f16\u8bd1\njavac -cp \/usr\/local\/kafka_2.10-0.8.2.1\/libs\/*: ghbConsumer.java\n\/\/ \u8fd0\u884c\njava -cp \/usr\/local\/kafka_2.10-0.8.2.1\/libs\/*: ghbConsumer\n<\/code><\/pre>\n<p><img decoding=\"async\" src=\"http:\/\/www.guanhaobo.cn\/wp-content\/uploads\/2020\/06\/2006.jpg\" alt=\"\" \/><\/p>\n<p>\u8f93\u5165topic\u540d\u79f0\u548c\u4fdd\u5b58\u6587\u4ef6\u7684\u8def\u5f84\u3002<\/p>\n<p><img decoding=\"async\" src=\"http:\/\/www.guanhaobo.cn\/wp-content\/uploads\/2020\/06\/2007.jpg\" alt=\"\" \/><\/p>\n<p>\u67e5\u770b\u4fdd\u5b58\u7684\u6587\u4ef6<\/p>\n<pre><code class=\"language-cpp line-numbers\">ls\ncat out.txt\n<\/code><\/pre>\n<p><img decoding=\"async\" src=\"http:\/\/www.guanhaobo.cn\/wp-content\/uploads\/2020\/06\/2008.jpg\" alt=\"\" \/><\/p>\n<h3>7 \u5173\u673a\u524d\u64cd\u4f5c<\/h3>\n<pre><code class=\"language-cpp line-numbers\">\/\/\u505c\u6b62kafka\nkafka-server-stop.sh\n\/\/\u505c\u6b62zookeeper\nzkServer.sh stop\n<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p>1 \u5b9e\u9a8c\u76ee\u7684 \u4e86\u89e3Kafka\u4e2d\u7684\u751f\u4ea7\u8005\u3001\u6d88\u8d39\u8005\u3001topic\u7b49\u57fa\u672c\u6982\u5ff5\uff1b \u719f\u6089Kafka Shell\u5e38\u7528\u547d\u4ee4\uff1b  [&hellip;]<\/p>\n","protected":false},"author":2,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[6],"tags":[44],"class_list":["post-571","post","type-post","status-publish","format-standard","hentry","category-homework","tag-44"],"_links":{"self":[{"href":"https:\/\/www.guanhaobo.cn\/index.php?rest_route=\/wp\/v2\/posts\/571","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.guanhaobo.cn\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.guanhaobo.cn\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.guanhaobo.cn\/index.php?rest_route=\/wp\/v2\/users\/2"}],"replies":[{"embeddable":true,"href":"https:\/\/www.guanhaobo.cn\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=571"}],"version-history":[{"count":0,"href":"https:\/\/www.guanhaobo.cn\/index.php?rest_route=\/wp\/v2\/posts\/571\/revisions"}],"wp:attachment":[{"href":"https:\/\/www.guanhaobo.cn\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=571"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.guanhaobo.cn\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=571"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.guanhaobo.cn\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=571"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}