본문 바로가기

카테고리 없음

45. 아파치 빔과 카프카 구성

오픈소스의 단점은 문서가 부실하고, 버그와 장애 시 자체 인력으로 모든 문제를 해결해야 한다. 라이센스를 구매한 것이 아니라서 내부적으로 모든 문제를 해결해야만 한다. 개발도 어렵지만, 구성하고 변경하는 것도 간단한 작업이 아니다. 블로그 원래의 의도는, 개발 위주로 내용을 전개하려고 했다. 구성이 복잡해서 소스만 설명한다면 의미를 전달하는데 효과적이지 않을거라 판단하고, 설치와 구성도 함께 설명한다. 문제는 내용이 너무 길어져서 개발 소스와 설치 구성 스크립트는 분리해서 블로그에 작성 중이다. 아파치 빔과 카프카 구성에 대해서 설명하는 것이 목적인데, 글이 너무 길어졌다.

 

카프카 첫번째 데모

kafka_2.12-2.5.0의 주키퍼와 카프카 프로세스를 시작

[root@localhost bin]# pwd
/home/aws/Downloads/kafka_2.12-2.5.0/bin
[root@localhost bin]#

주키퍼 시작

[root@localhost bin]# ./zookeeper-server-start.sh /home/aws/Downloads/kafka_2.11-0.10.0.0/config/zookeeper.properties
[2022-01-14 09:28:17,339] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,340] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,348] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2022-01-14 09:28:17,348] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2022-01-14 09:28:17,348] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2022-01-14 09:28:17,348] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2022-01-14 09:28:17,356] INFO Log4j found with jmx enabled. (org.apache.zookeeper.jmx.ManagedUtil)
[2022-01-14 09:28:17,379] INFO Reading configuration from: /home/aws/Downloads/kafka_2.11-0.10.0.0/config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,379] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,379] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,379] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2022-01-14 09:28:17,381] INFO zookeeper.snapshot.trust.empty : false (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2022-01-14 09:28:17,395] INFO Server environment:zookeeper.version=3.5.7-f0fdd52973d373ffd9c86b81d99842dc2c7f660e, built on 02/10/2020 11:30 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,395] INFO Server environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,395] INFO Server environment:java.version=1.8.0_302 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,395] INFO Server environment:java.vendor=Red Hat, Inc. (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.302.b08-0.el7_9.x86_64/jre (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.class.path=/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/activation-1.1.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/aopalliance-repackaged-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/argparse4j-0.7.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/audience-annotations-0.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/commons-cli-1.4.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/commons-lang3-3.8.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-api-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-basic-auth-extension-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-file-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-json-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-mirror-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-mirror-client-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-runtime-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-transforms-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/hk2-api-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/hk2-locator-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/hk2-utils-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-annotations-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-core-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-databind-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-dataformat-csv-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-datatype-jdk8-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-jaxrs-base-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-jaxrs-json-provider-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-module-jaxb-annotations-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-module-paranamer-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-module-scala_2.12-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.activation-api-1.2.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.annotation-api-1.3.4.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.inject-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/javassist-3.22.0-CR2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/javassist-3.26.0-GA.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/javax.servlet-api-3.1.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/javax.ws.rs-api-2.1.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jaxb-api-2.3.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-client-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-common-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-container-servlet-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-container-servlet-core-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-hk2-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-media-jaxb-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-server-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-client-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-continuation-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-http-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-io-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-security-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-server-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-servlet-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-servlets-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-util-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jopt-simple-5.0.4.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka_2.12-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka_2.12-2.5.0-sources.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-clients-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-log4j-appender-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-streams-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-streams-examples-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-streams-scala_2.12-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-streams-test-utils-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-tools-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/log4j-1.2.17.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/lz4-java-1.7.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/maven-artifact-3.6.3.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/metrics-core-2.2.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-buffer-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-codec-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-common-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-handler-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-resolver-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-transport-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-transport-native-epoll-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-transport-native-unix-common-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/paranamer-2.8.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/plexus-utils-3.2.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/reflections-0.9.12.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/rocksdbjni-5.18.3.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-collection-compat_2.12-2.1.3.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-java8-compat_2.12-0.9.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-library-2.12.10.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-logging_2.12-3.9.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-reflect-2.12.10.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/slf4j-api-1.7.30.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/slf4j-log4j12-1.7.30.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/snappy-java-1.1.7.3.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/validation-api-2.0.1.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/zookeeper-3.5.7.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/zookeeper-jute-3.5.7.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/zstd-jni-1.4.4-7.jar (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:os.version=3.10.0-1127.el7.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:user.name=root (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:user.home=/root (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:user.dir=/home/aws/Downloads/kafka_2.12-2.5.0/bin (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:os.memory.free=497MB (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,397] INFO Server environment:os.memory.max=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,397] INFO Server environment:os.memory.total=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,398] INFO minSessionTimeout set to 6000 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,398] INFO maxSessionTimeout set to 60000 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,398] INFO Created server with tickTime 3000 minSessionTimeout 6000 maxSessionTimeout 60000 datadir /tmp/zookeeper/version-2 snapdir /tmp/zookeeper/version-2 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,420] INFO Logging initialized @467ms to org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log)
[2022-01-14 09:28:17,562] WARN o.e.j.s.ServletContextHandler@33f88ab{/,null,UNAVAILABLE} contextPath ends with /* (org.eclipse.jetty.server.handler.ContextHandler)
[2022-01-14 09:28:17,562] WARN Empty contextPath (org.eclipse.jetty.server.handler.ContextHandler)
[2022-01-14 09:28:17,578] INFO jetty-9.4.24.v20191120; built: 2019-11-20T21:37:49.771Z; git: 363d5f2df3a8a28de40604320230664b9c793c16; jvm 1.8.0_302-b08 (org.eclipse.jetty.server.Server)
[2022-01-14 09:28:17,660] INFO DefaultSessionIdManager workerName=node0 (org.eclipse.jetty.server.session)
[2022-01-14 09:28:17,661] INFO No SessionScavenger set, using defaults (org.eclipse.jetty.server.session)
[2022-01-14 09:28:17,663] INFO node0 Scavenging every 660000ms (org.eclipse.jetty.server.session)
[2022-01-14 09:28:17,672] INFO Started o.e.j.s.ServletContextHandler@33f88ab{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler)
[2022-01-14 09:28:17,685] INFO Started ServerConnector@52a86356{HTTP/1.1,[http/1.1]}{0.0.0.0:8080} (org.eclipse.jetty.server.AbstractConnector)
[2022-01-14 09:28:17,686] INFO Started @733ms (org.eclipse.jetty.server.Server)
[2022-01-14 09:28:17,686] INFO Started AdminServer on address 0.0.0.0, port 8080 and command URL /commands (org.apache.zookeeper.server.admin.JettyAdminServer)
[2022-01-14 09:28:17,689] INFO Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory (org.apache.zookeeper.server.ServerCnxnFactory)
[2022-01-14 09:28:17,693] INFO Configuring NIO connection handler with 10s sessionless connection timeout, 1 selector thread(s), 4 worker threads, and 64 kB direct buffers. (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2022-01-14 09:28:17,695] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2022-01-14 09:28:17,709] INFO zookeeper.snapshotSizeFactor = 0.33 (org.apache.zookeeper.server.ZKDatabase)
[2022-01-14 09:28:17,712] INFO Reading snapshot /tmp/zookeeper/version-2/snapshot.10f (org.apache.zookeeper.server.persistence.FileSnap)
[2022-01-14 09:28:17,742] INFO Snapshotting: 0x134 to /tmp/zookeeper/version-2/snapshot.134 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2022-01-14 09:28:17,777] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)

카프카 서버 시작

[root@localhost bin]# ./kafka-server-start.sh /home/aws/Downloads/kafka_2.11-0.10.0.0/config/server.properties &

생산자 프로그램 시작

[root@localhost kafka-beam-example-master]# mvn compile exec:java -Dexec.mainClass=com.andrewjones.KafkaProducerExample -Dexec.args="--inputFile=pom.xml --output=producer" -Pdirect-runner
[INFO] Scanning for projects...
[INFO] 
[INFO] -----------------< com.andrewjones:kafka-beam-example >-----------------
[INFO] Building kafka-beam-example 0.1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ kafka-beam-example ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 1 resource
[INFO] 
[INFO] --- maven-compiler-plugin:3.6.1:compile (default-compile) @ kafka-beam-example ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ kafka-beam-example ---
Jan 14, 2022 9:34:03 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ProducerConfig values: 
	acks = 1
	batch.size = 16384
	block.on.buffer.full = false
	bootstrap.servers = [192.168.17.133:9092]
	buffer.memory = 33554432
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.LongSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.fetch.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 3
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	timeout.ms = 30000
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

Jan 14, 2022 9:34:03 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ProducerConfig values: 
	acks = 1
	batch.size = 16384
	block.on.buffer.full = false
	bootstrap.servers = [192.168.17.133:9092]
	buffer.memory = 33554432
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.LongSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.fetch.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 3
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	timeout.ms = 30000
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

Jan 14, 2022 9:34:03 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ProducerConfig values: 
	acks = 1
	batch.size = 16384
	block.on.buffer.full = false
	bootstrap.servers = [192.168.17.133:9092]
	buffer.memory = 33554432
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.LongSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.fetch.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 3
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	timeout.ms = 30000
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

Jan 14, 2022 9:34:04 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ProducerConfig values: 
	acks = 1
	batch.size = 16384
	block.on.buffer.full = false
	bootstrap.servers = [192.168.17.133:9092]
	buffer.memory = 33554432
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.LongSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.fetch.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 3
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	timeout.ms = 30000
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

Jan 14, 2022 9:34:04 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ProducerConfig values: 
	acks = 1
	batch.size = 16384
	block.on.buffer.full = false
	bootstrap.servers = [192.168.17.133:9092]
	buffer.memory = 33554432
	client.id = producer-2
	compression.type = none
	connections.max.idle.ms = 540000
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.LongSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.fetch.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 3
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	timeout.ms = 30000
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

Jan 14, 2022 9:34:04 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ProducerConfig values: 
	acks = 1
	batch.size = 16384
	block.on.buffer.full = false
	bootstrap.servers = [192.168.17.133:9092]
	buffer.memory = 33554432
	client.id = producer-3
	compression.type = none
	connections.max.idle.ms = 540000
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.LongSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.fetch.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 3
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	timeout.ms = 30000
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

Jan 14, 2022 9:34:04 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 0.10.1.0
Jan 14, 2022 9:34:04 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : 3402a74efb23d1d4
Jan 14, 2022 9:34:04 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 0.10.1.0
Jan 14, 2022 9:34:04 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : 3402a74efb23d1d4
Jan 14, 2022 9:34:04 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 0.10.1.0
Jan 14, 2022 9:34:04 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : 3402a74efb23d1d4
Jan 14, 2022 9:34:04 AM org.apache.kafka.clients.producer.KafkaProducer close
INFO: Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
Jan 14, 2022 9:34:04 AM org.apache.kafka.clients.producer.KafkaProducer close
INFO: Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
Jan 14, 2022 9:34:04 AM org.apache.kafka.clients.producer.KafkaProducer close
INFO: Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  3.275 s
[INFO] Finished at: 2022-01-14T09:34:04-05:00
[INFO] ------------------------------------------------------------------------
[root@localhost kafka-beam-example-master]#

수동으로 소비자 프로세스를 시작한다. 토픽에서 소비한 메세지를 출력한다.

[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.17.133:9092 --topic messenger
[2022-01-14 09:32:20,555] INFO [GroupCoordinator 0]: Preparing to rebalance group console-consumer-14309 in state PreparingRebalance with old generation 0 (__consumer_offsets-18) (reason: Adding new member consumer-console-consumer-14309-1-46f4edc7-b43b-4551-90fe-9d163933cdaa with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-01-14 09:32:23,589] INFO [GroupCoordinator 0]: Stabilized group console-consumer-14309 generation 1 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
[2022-01-14 09:32:23,620] INFO [GroupCoordinator 0]: Assignment received from leader for group console-consumer-14309 for generation 1 (kafka.coordinator.group.GroupCoordinator)
hi
hi there
hi sue bob
hi bob
hi sue

소비자 프로그램 시작한다. 소비자 프로그램은 토픽에서 소비한 메세지를 출력하지 않는다

[root@localhost kafka-beam-example-master]# mvn compile exec:java -Dexec.mainClass=com.andrewjones.KafkaConsumerExample -Dexec.args="--inputFile=pom.xml --output=consumer" -Pdirect-runner
[INFO] Scanning for projects...
[INFO] 
[INFO] -----------------< com.andrewjones:kafka-beam-example >-----------------
[INFO] Building kafka-beam-example 0.1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ kafka-beam-example ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 1 resource
[INFO] 
[INFO] --- maven-compiler-plugin:3.6.1:compile (default-compile) @ kafka-beam-example ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ kafka-beam-example ---
Jan 14, 2022 9:35:00 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [192.168.17.133:9092]
	check.crcs = true
	client.id = 
	connections.max.idle.ms = 540000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = 
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 524288
	reconnect.backoff.ms = 50
	request.timeout.ms = 305000
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Jan 14, 2022 9:35:00 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [192.168.17.133:9092]
	check.crcs = true
	client.id = consumer-1
	connections.max.idle.ms = 540000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = 
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 524288
	reconnect.backoff.ms = 50
	request.timeout.ms = 305000
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Jan 14, 2022 9:35:00 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 0.10.1.0
Jan 14, 2022 9:35:00 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : 3402a74efb23d1d4
Jan 14, 2022 9:35:00 AM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource split
INFO: Partitions assigned to split 0 (total 1): messenger-0
Jan 14, 2022 9:35:00 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [192.168.17.133:9092]
	check.crcs = true
	client.id = 
	connections.max.idle.ms = 540000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = 
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 524288
	reconnect.backoff.ms = 50
	request.timeout.ms = 305000
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Jan 14, 2022 9:35:00 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [192.168.17.133:9092]
	check.crcs = true
	client.id = consumer-2
	connections.max.idle.ms = 540000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = 
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 524288
	reconnect.backoff.ms = 50
	request.timeout.ms = 305000
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Jan 14, 2022 9:35:00 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 0.10.1.0
Jan 14, 2022 9:35:00 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : 3402a74efb23d1d4
Jan 14, 2022 9:35:01 AM org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler onSuccess
INFO: Discovered coordinator 192.168.17.133:9092 (id: 2147483647 rack: null) for group .
Jan 14, 2022 9:35:01 AM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader start
INFO: Reader-0: reading from messenger-0 starting at offset 9
Jan 14, 2022 9:35:01 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [192.168.17.133:9092]
	check.crcs = true
	client.id = 
	connections.max.idle.ms = 540000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = Reader-0_offset_consumer_1442400895_none
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 524288
	reconnect.backoff.ms = 50
	request.timeout.ms = 305000
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Jan 14, 2022 9:35:01 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [192.168.17.133:9092]
	check.crcs = true
	client.id = consumer-3
	connections.max.idle.ms = 540000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = Reader-0_offset_consumer_1442400895_none
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 524288
	reconnect.backoff.ms = 50
	request.timeout.ms = 305000
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Jan 14, 2022 9:35:01 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 0.10.1.0
Jan 14, 2022 9:35:01 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : 3402a74efb23d1d4
Jan 14, 2022 9:35:01 AM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader advance
INFO: Reader-0: first record offset 9
Jan 14, 2022 9:35:01 AM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader consumerPollLoop
INFO: Reader-0: Returning from consumer pool loop
Jan 14, 2022 9:35:01 AM org.apache.beam.sdk.io.WriteFiles$WriteShardedBundles processElement
INFO: Opening writer for write operation TextWriteOperation{tempDirectory=/home/aws/Downloads/kafka-beam-example-master/.temp-beam-2022-01-14_14-35-00-1/, windowedWrites=false}
Jan 14, 2022 9:35:01 AM org.apache.beam.sdk.io.WriteFiles$WriteShardedBundles processElement
INFO: Opening writer for write operation TextWriteOperation{tempDirectory=/home/aws/Downloads/kafka-beam-example-master/.temp-beam-2022-01-14_14-35-00-1/, windowedWrites=false}
Jan 14, 2022 9:35:01 AM org.apache.beam.sdk.io.WriteFiles$WriteShardedBundles processElement
INFO: Opening writer for write operation TextWriteOperation{tempDirectory=/home/aws/Downloads/kafka-beam-example-master/.temp-beam-2022-01-14_14-35-00-1/, windowedWrites=false}
Jan 14, 2022 9:35:01 AM org.apache.beam.sdk.io.WriteFiles$2 processElement
INFO: Finalizing write operation TextWriteOperation{tempDirectory=/home/aws/Downloads/kafka-beam-example-master/.temp-beam-2022-01-14_14-35-00-1/, windowedWrites=false}.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  4.462 s
[INFO] Finished at: 2022-01-14T09:35:01-05:00
[INFO] ------------------------------------------------------------------------

콘솔에 출력하지 않고. 특정 디렉토리에 파일을 생성한다.

[aws@localhost kafka-beam-example-master]$ ll
total 40
-rw-r--r--. 1 aws  aws    617 Oct 24  2017 docker-compose.yaml
-rw-r--r--. 1 aws  aws   1216 Oct 24  2017 Makefile
-rw-r--r--. 1 aws  aws  13609 Oct 24  2017 pom.xml
-rw-r--r--. 1 aws  aws    515 Oct 24  2017 README.md
drwxr-xr-x. 3 aws  aws     18 Oct 24  2017 src
drwxr-xr-x. 5 root root    66 Oct 24 08:53 target
-rw-r--r--. 1 root root     0 Jan 14 09:43 wordcounts-00000-of-00004
-rw-r--r--. 1 root root     7 Jan 14 09:43 wordcounts-00001-of-00004
-rw-r--r--. 1 root root    16 Jan 14 09:43 wordcounts-00002-of-00004
-rw-r--r--. 1 root root     6 Jan 14 09:43 wordcounts-00003-of-00004
[aws@localhost kafka-beam-example-master]$

필요 시 토픽을 생성

[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 192.168.17.133:2181 --replication-factor 1 --partitions 1 --topic messenger

토픽을 삭제

[root@localhost bin]# ./kafka-topics.sh --zookeeper 192.168.17.133:2181 --delete --topic messenger

주키퍼의 zookeeper.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

카프카의 server.properties

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = security_protocol://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.17.133:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/home/aws/Downloads/kafka_2.11-0.10.0.0/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

 

카프카 두번째 데모

 

주키퍼 시작

[root@localhost bin]# ./zookeeper-server-start.sh /home/aws/Downloads/kafka_2.11-0.10.0.0/config/zookeeper.properties
[2022-01-14 09:28:17,339] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,340] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,348] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2022-01-14 09:28:17,348] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2022-01-14 09:28:17,348] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2022-01-14 09:28:17,348] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2022-01-14 09:28:17,356] INFO Log4j found with jmx enabled. (org.apache.zookeeper.jmx.ManagedUtil)
[2022-01-14 09:28:17,379] INFO Reading configuration from: /home/aws/Downloads/kafka_2.11-0.10.0.0/config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,379] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,379] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,379] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2022-01-14 09:28:17,381] INFO zookeeper.snapshot.trust.empty : false (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2022-01-14 09:28:17,395] INFO Server environment:zookeeper.version=3.5.7-f0fdd52973d373ffd9c86b81d99842dc2c7f660e, built on 02/10/2020 11:30 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,395] INFO Server environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,395] INFO Server environment:java.version=1.8.0_302 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,395] INFO Server environment:java.vendor=Red Hat, Inc. (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.302.b08-0.el7_9.x86_64/jre (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.class.path=/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/activation-1.1.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/aopalliance-repackaged-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/argparse4j-0.7.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/audience-annotations-0.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/commons-cli-1.4.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/commons-lang3-3.8.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-api-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-basic-auth-extension-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-file-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-json-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-mirror-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-mirror-client-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-runtime-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-transforms-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/hk2-api-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/hk2-locator-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/hk2-utils-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-annotations-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-core-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-databind-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-dataformat-csv-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-datatype-jdk8-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-jaxrs-base-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-jaxrs-json-provider-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-module-jaxb-annotations-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-module-paranamer-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-module-scala_2.12-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.activation-api-1.2.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.annotation-api-1.3.4.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.inject-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/javassist-3.22.0-CR2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/javassist-3.26.0-GA.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/javax.servlet-api-3.1.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/javax.ws.rs-api-2.1.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jaxb-api-2.3.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-client-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-common-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-container-servlet-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-container-servlet-core-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-hk2-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-media-jaxb-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-server-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-client-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-continuation-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-http-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-io-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-security-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-server-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-servlet-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-servlets-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-util-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jopt-simple-5.0.4.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka_2.12-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka_2.12-2.5.0-sources.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-clients-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-log4j-appender-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-streams-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-streams-examples-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-streams-scala_2.12-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-streams-test-utils-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-tools-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/log4j-1.2.17.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/lz4-java-1.7.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/maven-artifact-3.6.3.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/metrics-core-2.2.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-buffer-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-codec-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-common-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-handler-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-resolver-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-transport-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-transport-native-epoll-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-transport-native-unix-common-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/paranamer-2.8.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/plexus-utils-3.2.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/reflections-0.9.12.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/rocksdbjni-5.18.3.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-collection-compat_2.12-2.1.3.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-java8-compat_2.12-0.9.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-library-2.12.10.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-logging_2.12-3.9.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-reflect-2.12.10.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/slf4j-api-1.7.30.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/slf4j-log4j12-1.7.30.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/snappy-java-1.1.7.3.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/validation-api-2.0.1.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/zookeeper-3.5.7.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/zookeeper-jute-3.5.7.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/zstd-jni-1.4.4-7.jar (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:os.version=3.10.0-1127.el7.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:user.name=root (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:user.home=/root (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:user.dir=/home/aws/Downloads/kafka_2.12-2.5.0/bin (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:os.memory.free=497MB (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,397] INFO Server environment:os.memory.max=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,397] INFO Server environment:os.memory.total=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,398] INFO minSessionTimeout set to 6000 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,398] INFO maxSessionTimeout set to 60000 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,398] INFO Created server with tickTime 3000 minSessionTimeout 6000 maxSessionTimeout 60000 datadir /tmp/zookeeper/version-2 snapdir /tmp/zookeeper/version-2 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,420] INFO Logging initialized @467ms to org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log)
[2022-01-14 09:28:17,562] WARN o.e.j.s.ServletContextHandler@33f88ab{/,null,UNAVAILABLE} contextPath ends with /* (org.eclipse.jetty.server.handler.ContextHandler)
[2022-01-14 09:28:17,562] WARN Empty contextPath (org.eclipse.jetty.server.handler.ContextHandler)
[2022-01-14 09:28:17,578] INFO jetty-9.4.24.v20191120; built: 2019-11-20T21:37:49.771Z; git: 363d5f2df3a8a28de40604320230664b9c793c16; jvm 1.8.0_302-b08 (org.eclipse.jetty.server.Server)
[2022-01-14 09:28:17,660] INFO DefaultSessionIdManager workerName=node0 (org.eclipse.jetty.server.session)
[2022-01-14 09:28:17,661] INFO No SessionScavenger set, using defaults (org.eclipse.jetty.server.session)
[2022-01-14 09:28:17,663] INFO node0 Scavenging every 660000ms (org.eclipse.jetty.server.session)
[2022-01-14 09:28:17,672] INFO Started o.e.j.s.ServletContextHandler@33f88ab{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler)
[2022-01-14 09:28:17,685] INFO Started ServerConnector@52a86356{HTTP/1.1,[http/1.1]}{0.0.0.0:8080} (org.eclipse.jetty.server.AbstractConnector)
[2022-01-14 09:28:17,686] INFO Started @733ms (org.eclipse.jetty.server.Server)
[2022-01-14 09:28:17,686] INFO Started AdminServer on address 0.0.0.0, port 8080 and command URL /commands (org.apache.zookeeper.server.admin.JettyAdminServer)
[2022-01-14 09:28:17,689] INFO Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory (org.apache.zookeeper.server.ServerCnxnFactory)
[2022-01-14 09:28:17,693] INFO Configuring NIO connection handler with 10s sessionless connection timeout, 1 selector thread(s), 4 worker threads, and 64 kB direct buffers. (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2022-01-14 09:28:17,695] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2022-01-14 09:28:17,709] INFO zookeeper.snapshotSizeFactor = 0.33 (org.apache.zookeeper.server.ZKDatabase)
[2022-01-14 09:28:17,712] INFO Reading snapshot /tmp/zookeeper/version-2/snapshot.10f (org.apache.zookeeper.server.persistence.FileSnap)
[2022-01-14 09:28:17,742] INFO Snapshotting: 0x134 to /tmp/zookeeper/version-2/snapshot.134 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2022-01-14 09:28:17,777] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)

카프카 서버 시작

[root@localhost bin]# ./kafka-server-start.sh /home/aws/Downloads/kafka_2.11-0.10.0.0/config/server.properties &

 

 

소비자 프로그램을 빌드하고 실행

[root@localhost Beam-examples-master]# mvn compile exec:java -Dexec.mainClass=com.sunil.WindowedWordCount -Pdirect-runner -Dexec.args="--output=/home/aws/Downloads/output/"
[INFO] Scanning for projects...
[WARNING] 
[WARNING] Some problems were encountered while building the effective model for com.sunil:beam-examples:jar:0.1
[WARNING] 'build.plugins.plugin.(groupId:artifactId)' must be unique but found duplicate declaration of plugin org.apache.maven.plugins:maven-jar-plugin @ line 124, column 21
[WARNING] 
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING] 
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING] 
[INFO] 
[INFO] ----------------------< com.sunil:beam-examples >-----------------------
[INFO] Building beam-examples 0.1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ beam-examples ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/aws/Downloads/Beam-examples-master/src/main/resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.7.0:compile (default-compile) @ beam-examples ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- exec-maven-plugin:1.6.0:java (default-cli) @ beam-examples ---
Oct 24, 2021 8:31:39 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [192.168.17.133:9092]
	check.crcs = true
	client.dns.lookup = default
	client.id = 
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = null
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 524288
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Oct 24, 2021 8:31:39 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version: 2.4.0
Oct 24, 2021 8:31:39 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId: 77a89fcf8d7fa018
Oct 24, 2021 8:31:39 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka startTimeMs: 1635078699859
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.Metadata update
INFO: [Consumer clientId=consumer-1, groupId=null] Cluster ID: NVfRzZzESxOcPYrXHstbLg
Oct 24, 2021 8:31:40 AM org.apache.beam.sdk.io.kafka.KafkaUnboundedSource split
INFO: Partitions assigned to split 0 (total 1): messenger-0
Oct 24, 2021 8:31:40 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [192.168.17.133:9092]
	check.crcs = true
	client.dns.lookup = default
	client.id = 
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = null
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 524288
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Oct 24, 2021 8:31:40 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version: 2.4.0
Oct 24, 2021 8:31:40 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId: 77a89fcf8d7fa018
Oct 24, 2021 8:31:40 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka startTimeMs: 1635078700243
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.consumer.KafkaConsumer assign
INFO: [Consumer clientId=consumer-2, groupId=null] Subscribed to partition(s): messenger-0
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.Metadata update
INFO: [Consumer clientId=consumer-2, groupId=null] Cluster ID: NVfRzZzESxOcPYrXHstbLg
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-2, groupId=null] Resetting offset for partition messenger-0 to offset 0.
Oct 24, 2021 8:31:40 AM org.apache.beam.sdk.io.kafka.KafkaUnboundedReader start
INFO: Reader-0: reading from messenger-0 starting at offset 0
Oct 24, 2021 8:31:40 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [192.168.17.133:9092]
	check.crcs = true
	client.dns.lookup = default
	client.id = 
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = Reader-0_offset_consumer_1217782153_none
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 524288
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Oct 24, 2021 8:31:40 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version: 2.4.0
Oct 24, 2021 8:31:40 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId: 77a89fcf8d7fa018
Oct 24, 2021 8:31:40 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka startTimeMs: 1635078700588
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.consumer.KafkaConsumer assign
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Subscribed to partition(s): messenger-0
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Seeking to LATEST offset of partition messenger-0
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.Metadata update
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Cluster ID: NVfRzZzESxOcPYrXHstbLg
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Resetting offset for partition messenger-0 to offset 0.
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Seeking to LATEST offset of partition messenger-0
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Resetting offset for partition messenger-0 to offset 0.
Oct 24, 2021 8:31:41 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Seeking to LATEST offset of partition messenger-0
Oct 24, 2021 8:31:41 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Resetting offset for partition messenger-0 to offset 0.
Oct 24, 2021 8:31:42 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Seeking to LATEST offset of partition messenger-0

생산자 프로그램을 수동으로 실행

[root@localhost scripts]# python3 GenMessage.py krafton
{"name":"krafton","message": "This message is from krafton","ets": 1642173571124}
>>{"name":"krafton","message": "This message is from krafton","ets": 1642173571124}
message sent!

소비자 프로세스를 수동으로 실행

[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.17.133:9092 --topic messenger
{"name":"philipjung","message": "This message is from philipjung","ets": 1635079382057}
{"name":"krafton","message": "This message is from krafton","ets": 1635079399238}
{"name":"krafton","message": "This message is from krafton","ets": 1642173571124}

소비자 프로세스를 중지한다. 이유는 생산자 프로그램을 테스트하기 위해서이다.

생산자 프로그램을 수동으로 실행한다.

[root@localhost scripts]# python3 GenMessage.py philipjung1234
{"name":"philipjung1234","message": "This message is from philipjung1234","ets": 1642173830085}
>>{"name":"philipjung1234","message": "This message is from philipjung1234","ets": 1642173830085}
message sent!

소비자 프로그램이 토픽에서 읽고, 메세지를 출력한다

Jan 14, 2022 10:23:51 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Resetting offset for partition messenger-0 to offset 6.
arg1: {"name":"philipjung1234","message": "This message is from philipjung1234","ets": 1642173830085}arg0:  messenger
Jan 14, 2022 10:23:52 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Seeking to LATEST offset of partition messenger-0
Jan 14, 2022 10:23:52 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Resetting offset for partition messenger-0 to offset 7.
Jan 14, 2022 10:23:53 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Seeking to LATEST offset of partition messenger-0
Jan 14, 2022 10:23:53 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Resetting offset for partition messenger-0 to offset 7.
Jan 14, 2022 10:23:54 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Seeking to LATEST offset of partition messenger-0
Jan 14, 2022 10:23:54 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Resetting offset for partition messenger-0 to offset 7.
Jan 14, 2022 10:23:55 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Seeking to LATEST offset of partition messenger-0
Jan 14, 2022 10:23:55 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Resetting offset for partition messenger-0 to offset 7.
Jan 14, 2022 10:23:56 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Seeking to LATEST offset of partition messenger-0

읽은 내용은 파일에 쓴다.

[aws@localhost Downloads]$ ll
total 292188
drwxr-xr-x.  6 root root        99 Oct 24 07:25 apache-maven-3.6.3
-rw-r--r--.  1 root root   9506321 Jul  3  2020 apache-maven-3.6.3-bin.tar.gz
drwxr-xr-x.  6 aws  aws        134 Oct 24 07:49 Beam-examples-master
-rw-rw-r--.  1 aws  aws      91647 Oct 24 07:46 Beam-examples-master.zip
drwxr-xr-x.  8 aws  aws        119 Oct 24 06:14 kafka_2.11-0.10.0.0
drwxr-xr-x.  7 aws  aws        101 Oct 24 07:52 kafka_2.12-2.5.0
-rw-rw-r--.  1 aws  aws   61604633 Oct 24 06:19 kafka_2.12-2.5.0.tgz
-rw-rw-r--.  1 aws  aws       9352 Oct 24 05:15 kafka-beam-example-master.zip
-rw-r--r--.  1 root root        39 Jan 14 11:05 output-2022-01-14 08:04:00.000-2022-01-14 08:05:00.000-0-of-1
drwxr-xr-x. 13 aws  aws        211 May  1  2019 spark-2.4.3-bin-hadoop2.6
-rwxrw-rw-.  1 aws  aws  227973463 Aug 17 08:36 spark-2.4.3-bin-hadoop2.6.tgz
[aws@localhost Downloads]$