
오픈소스의 단점은 문서가 부실하고, 버그와 장애 시 자체 인력으로 모든 문제를 해결해야 한다. 라이센스를 구매한 것이 아니라서 내부적으로 모든 문제를 해결해야만 한다. 개발도 어렵지만, 구성하고 변경하는 것도 간단한 작업이 아니다. 블로그 원래의 의도는, 개발 위주로 내용을 전개하려고 했다. 구성이 복잡해서 소스만 설명한다면 의미를 전달하는데 효과적이지 않을거라 판단하고, 설치와 구성도 함께 설명한다. 문제는 내용이 너무 길어져서 개발 소스와 설치 구성 스크립트는 분리해서 블로그에 작성 중이다. 아파치 빔과 카프카 구성에 대해서 설명하는 것이 목적인데, 글이 너무 길어졌다.
카프카 첫번째 데모
kafka_2.12-2.5.0의 주키퍼와 카프카 프로세스를 시작
[root@localhost bin]# pwd
/home/aws/Downloads/kafka_2.12-2.5.0/bin
[root@localhost bin]#
주키퍼 시작
[root@localhost bin]# ./zookeeper-server-start.sh /home/aws/Downloads/kafka_2.11-0.10.0.0/config/zookeeper.properties
[2022-01-14 09:28:17,339] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,340] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,348] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2022-01-14 09:28:17,348] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2022-01-14 09:28:17,348] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2022-01-14 09:28:17,348] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2022-01-14 09:28:17,356] INFO Log4j found with jmx enabled. (org.apache.zookeeper.jmx.ManagedUtil)
[2022-01-14 09:28:17,379] INFO Reading configuration from: /home/aws/Downloads/kafka_2.11-0.10.0.0/config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,379] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,379] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,379] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2022-01-14 09:28:17,381] INFO zookeeper.snapshot.trust.empty : false (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2022-01-14 09:28:17,395] INFO Server environment:zookeeper.version=3.5.7-f0fdd52973d373ffd9c86b81d99842dc2c7f660e, built on 02/10/2020 11:30 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,395] INFO Server environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,395] INFO Server environment:java.version=1.8.0_302 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,395] INFO Server environment:java.vendor=Red Hat, Inc. (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.302.b08-0.el7_9.x86_64/jre (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.class.path=/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/activation-1.1.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/aopalliance-repackaged-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/argparse4j-0.7.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/audience-annotations-0.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/commons-cli-1.4.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/commons-lang3-3.8.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-api-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-basic-auth-extension-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-file-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-json-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-mirror-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-mirror-client-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-runtime-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-transforms-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/hk2-api-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/hk2-locator-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/hk2-utils-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-annotations-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-core-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-databind-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-dataformat-csv-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-datatype-jdk8-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-jaxrs-base-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-jaxrs-json-provider-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-module-jaxb-annotations-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-module-paranamer-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-module-scala_2.12-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.activation-api-1.2.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.annotation-api-1.3.4.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.inject-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/javassist-3.22.0-CR2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/javassist-3.26.0-GA.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/javax.servlet-api-3.1.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/javax.ws.rs-api-2.1.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jaxb-api-2.3.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-client-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-common-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-container-servlet-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-container-servlet-core-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-hk2-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-media-jaxb-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-server-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-client-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-continuation-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-http-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-io-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-security-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-server-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-servlet-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-servlets-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-util-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jopt-simple-5.0.4.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka_2.12-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka_2.12-2.5.0-sources.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-clients-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-log4j-appender-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-streams-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-streams-examples-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-streams-scala_2.12-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-streams-test-utils-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-tools-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/log4j-1.2.17.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/lz4-java-1.7.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/maven-artifact-3.6.3.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/metrics-core-2.2.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-buffer-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-codec-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-common-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-handler-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-resolver-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-transport-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-transport-native-epoll-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-transport-native-unix-common-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/paranamer-2.8.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/plexus-utils-3.2.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/reflections-0.9.12.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/rocksdbjni-5.18.3.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-collection-compat_2.12-2.1.3.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-java8-compat_2.12-0.9.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-library-2.12.10.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-logging_2.12-3.9.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-reflect-2.12.10.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/slf4j-api-1.7.30.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/slf4j-log4j12-1.7.30.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/snappy-java-1.1.7.3.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/validation-api-2.0.1.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/zookeeper-3.5.7.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/zookeeper-jute-3.5.7.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/zstd-jni-1.4.4-7.jar (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:os.version=3.10.0-1127.el7.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:user.name=root (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:user.home=/root (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:user.dir=/home/aws/Downloads/kafka_2.12-2.5.0/bin (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:os.memory.free=497MB (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,397] INFO Server environment:os.memory.max=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,397] INFO Server environment:os.memory.total=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,398] INFO minSessionTimeout set to 6000 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,398] INFO maxSessionTimeout set to 60000 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,398] INFO Created server with tickTime 3000 minSessionTimeout 6000 maxSessionTimeout 60000 datadir /tmp/zookeeper/version-2 snapdir /tmp/zookeeper/version-2 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,420] INFO Logging initialized @467ms to org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log)
[2022-01-14 09:28:17,562] WARN o.e.j.s.ServletContextHandler@33f88ab{/,null,UNAVAILABLE} contextPath ends with /* (org.eclipse.jetty.server.handler.ContextHandler)
[2022-01-14 09:28:17,562] WARN Empty contextPath (org.eclipse.jetty.server.handler.ContextHandler)
[2022-01-14 09:28:17,578] INFO jetty-9.4.24.v20191120; built: 2019-11-20T21:37:49.771Z; git: 363d5f2df3a8a28de40604320230664b9c793c16; jvm 1.8.0_302-b08 (org.eclipse.jetty.server.Server)
[2022-01-14 09:28:17,660] INFO DefaultSessionIdManager workerName=node0 (org.eclipse.jetty.server.session)
[2022-01-14 09:28:17,661] INFO No SessionScavenger set, using defaults (org.eclipse.jetty.server.session)
[2022-01-14 09:28:17,663] INFO node0 Scavenging every 660000ms (org.eclipse.jetty.server.session)
[2022-01-14 09:28:17,672] INFO Started o.e.j.s.ServletContextHandler@33f88ab{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler)
[2022-01-14 09:28:17,685] INFO Started ServerConnector@52a86356{HTTP/1.1,[http/1.1]}{0.0.0.0:8080} (org.eclipse.jetty.server.AbstractConnector)
[2022-01-14 09:28:17,686] INFO Started @733ms (org.eclipse.jetty.server.Server)
[2022-01-14 09:28:17,686] INFO Started AdminServer on address 0.0.0.0, port 8080 and command URL /commands (org.apache.zookeeper.server.admin.JettyAdminServer)
[2022-01-14 09:28:17,689] INFO Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory (org.apache.zookeeper.server.ServerCnxnFactory)
[2022-01-14 09:28:17,693] INFO Configuring NIO connection handler with 10s sessionless connection timeout, 1 selector thread(s), 4 worker threads, and 64 kB direct buffers. (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2022-01-14 09:28:17,695] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2022-01-14 09:28:17,709] INFO zookeeper.snapshotSizeFactor = 0.33 (org.apache.zookeeper.server.ZKDatabase)
[2022-01-14 09:28:17,712] INFO Reading snapshot /tmp/zookeeper/version-2/snapshot.10f (org.apache.zookeeper.server.persistence.FileSnap)
[2022-01-14 09:28:17,742] INFO Snapshotting: 0x134 to /tmp/zookeeper/version-2/snapshot.134 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2022-01-14 09:28:17,777] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)
카프카 서버 시작
[root@localhost bin]# ./kafka-server-start.sh /home/aws/Downloads/kafka_2.11-0.10.0.0/config/server.properties &
생산자 프로그램 시작
[root@localhost kafka-beam-example-master]# mvn compile exec:java -Dexec.mainClass=com.andrewjones.KafkaProducerExample -Dexec.args="--inputFile=pom.xml --output=producer" -Pdirect-runner
[INFO] Scanning for projects...
[INFO]
[INFO] -----------------< com.andrewjones:kafka-beam-example >-----------------
[INFO] Building kafka-beam-example 0.1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ kafka-beam-example ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 1 resource
[INFO]
[INFO] --- maven-compiler-plugin:3.6.1:compile (default-compile) @ kafka-beam-example ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ kafka-beam-example ---
Jan 14, 2022 9:34:03 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ProducerConfig values:
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [192.168.17.133:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.LongSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 3
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
Jan 14, 2022 9:34:03 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ProducerConfig values:
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [192.168.17.133:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.LongSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 3
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
Jan 14, 2022 9:34:03 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ProducerConfig values:
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [192.168.17.133:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.LongSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 3
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
Jan 14, 2022 9:34:04 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ProducerConfig values:
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [192.168.17.133:9092]
buffer.memory = 33554432
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.LongSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 3
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
Jan 14, 2022 9:34:04 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ProducerConfig values:
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [192.168.17.133:9092]
buffer.memory = 33554432
client.id = producer-2
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.LongSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 3
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
Jan 14, 2022 9:34:04 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ProducerConfig values:
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [192.168.17.133:9092]
buffer.memory = 33554432
client.id = producer-3
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.LongSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 3
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
Jan 14, 2022 9:34:04 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 0.10.1.0
Jan 14, 2022 9:34:04 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : 3402a74efb23d1d4
Jan 14, 2022 9:34:04 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 0.10.1.0
Jan 14, 2022 9:34:04 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : 3402a74efb23d1d4
Jan 14, 2022 9:34:04 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 0.10.1.0
Jan 14, 2022 9:34:04 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : 3402a74efb23d1d4
Jan 14, 2022 9:34:04 AM org.apache.kafka.clients.producer.KafkaProducer close
INFO: Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
Jan 14, 2022 9:34:04 AM org.apache.kafka.clients.producer.KafkaProducer close
INFO: Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
Jan 14, 2022 9:34:04 AM org.apache.kafka.clients.producer.KafkaProducer close
INFO: Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.275 s
[INFO] Finished at: 2022-01-14T09:34:04-05:00
[INFO] ------------------------------------------------------------------------
[root@localhost kafka-beam-example-master]#
수동으로 소비자 프로세스를 시작한다. 토픽에서 소비한 메세지를 출력한다.
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.17.133:9092 --topic messenger
[2022-01-14 09:32:20,555] INFO [GroupCoordinator 0]: Preparing to rebalance group console-consumer-14309 in state PreparingRebalance with old generation 0 (__consumer_offsets-18) (reason: Adding new member consumer-console-consumer-14309-1-46f4edc7-b43b-4551-90fe-9d163933cdaa with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-01-14 09:32:23,589] INFO [GroupCoordinator 0]: Stabilized group console-consumer-14309 generation 1 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
[2022-01-14 09:32:23,620] INFO [GroupCoordinator 0]: Assignment received from leader for group console-consumer-14309 for generation 1 (kafka.coordinator.group.GroupCoordinator)
hi
hi there
hi sue bob
hi bob
hi sue
소비자 프로그램 시작한다. 소비자 프로그램은 토픽에서 소비한 메세지를 출력하지 않는다
[root@localhost kafka-beam-example-master]# mvn compile exec:java -Dexec.mainClass=com.andrewjones.KafkaConsumerExample -Dexec.args="--inputFile=pom.xml --output=consumer" -Pdirect-runner
[INFO] Scanning for projects...
[INFO]
[INFO] -----------------< com.andrewjones:kafka-beam-example >-----------------
[INFO] Building kafka-beam-example 0.1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ kafka-beam-example ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 1 resource
[INFO]
[INFO] --- maven-compiler-plugin:3.6.1:compile (default-compile) @ kafka-beam-example ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ kafka-beam-example ---
Jan 14, 2022 9:35:00 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [192.168.17.133:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Jan 14, 2022 9:35:00 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [192.168.17.133:9092]
check.crcs = true
client.id = consumer-1
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Jan 14, 2022 9:35:00 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 0.10.1.0
Jan 14, 2022 9:35:00 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : 3402a74efb23d1d4
Jan 14, 2022 9:35:00 AM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource split
INFO: Partitions assigned to split 0 (total 1): messenger-0
Jan 14, 2022 9:35:00 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [192.168.17.133:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Jan 14, 2022 9:35:00 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [192.168.17.133:9092]
check.crcs = true
client.id = consumer-2
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Jan 14, 2022 9:35:00 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 0.10.1.0
Jan 14, 2022 9:35:00 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : 3402a74efb23d1d4
Jan 14, 2022 9:35:01 AM org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler onSuccess
INFO: Discovered coordinator 192.168.17.133:9092 (id: 2147483647 rack: null) for group .
Jan 14, 2022 9:35:01 AM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader start
INFO: Reader-0: reading from messenger-0 starting at offset 9
Jan 14, 2022 9:35:01 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [192.168.17.133:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = Reader-0_offset_consumer_1442400895_none
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Jan 14, 2022 9:35:01 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [192.168.17.133:9092]
check.crcs = true
client.id = consumer-3
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = Reader-0_offset_consumer_1442400895_none
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Jan 14, 2022 9:35:01 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 0.10.1.0
Jan 14, 2022 9:35:01 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : 3402a74efb23d1d4
Jan 14, 2022 9:35:01 AM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader advance
INFO: Reader-0: first record offset 9
Jan 14, 2022 9:35:01 AM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader consumerPollLoop
INFO: Reader-0: Returning from consumer pool loop
Jan 14, 2022 9:35:01 AM org.apache.beam.sdk.io.WriteFiles$WriteShardedBundles processElement
INFO: Opening writer for write operation TextWriteOperation{tempDirectory=/home/aws/Downloads/kafka-beam-example-master/.temp-beam-2022-01-14_14-35-00-1/, windowedWrites=false}
Jan 14, 2022 9:35:01 AM org.apache.beam.sdk.io.WriteFiles$WriteShardedBundles processElement
INFO: Opening writer for write operation TextWriteOperation{tempDirectory=/home/aws/Downloads/kafka-beam-example-master/.temp-beam-2022-01-14_14-35-00-1/, windowedWrites=false}
Jan 14, 2022 9:35:01 AM org.apache.beam.sdk.io.WriteFiles$WriteShardedBundles processElement
INFO: Opening writer for write operation TextWriteOperation{tempDirectory=/home/aws/Downloads/kafka-beam-example-master/.temp-beam-2022-01-14_14-35-00-1/, windowedWrites=false}
Jan 14, 2022 9:35:01 AM org.apache.beam.sdk.io.WriteFiles$2 processElement
INFO: Finalizing write operation TextWriteOperation{tempDirectory=/home/aws/Downloads/kafka-beam-example-master/.temp-beam-2022-01-14_14-35-00-1/, windowedWrites=false}.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4.462 s
[INFO] Finished at: 2022-01-14T09:35:01-05:00
[INFO] ------------------------------------------------------------------------
콘솔에 출력하지 않고. 특정 디렉토리에 파일을 생성한다.
[aws@localhost kafka-beam-example-master]$ ll
total 40
-rw-r--r--. 1 aws aws 617 Oct 24 2017 docker-compose.yaml
-rw-r--r--. 1 aws aws 1216 Oct 24 2017 Makefile
-rw-r--r--. 1 aws aws 13609 Oct 24 2017 pom.xml
-rw-r--r--. 1 aws aws 515 Oct 24 2017 README.md
drwxr-xr-x. 3 aws aws 18 Oct 24 2017 src
drwxr-xr-x. 5 root root 66 Oct 24 08:53 target
-rw-r--r--. 1 root root 0 Jan 14 09:43 wordcounts-00000-of-00004
-rw-r--r--. 1 root root 7 Jan 14 09:43 wordcounts-00001-of-00004
-rw-r--r--. 1 root root 16 Jan 14 09:43 wordcounts-00002-of-00004
-rw-r--r--. 1 root root 6 Jan 14 09:43 wordcounts-00003-of-00004
[aws@localhost kafka-beam-example-master]$
필요 시 토픽을 생성
[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 192.168.17.133:2181 --replication-factor 1 --partitions 1 --topic messenger
토픽을 삭제
[root@localhost bin]# ./kafka-topics.sh --zookeeper 192.168.17.133:2181 --delete --topic messenger
주키퍼의 zookeeper.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
카프카의 server.properties
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = security_protocol://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.17.133:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# The number of threads handling network requests
num.network.threads=3
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/home/aws/Downloads/kafka_2.11-0.10.0.0/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
카프카 두번째 데모
주키퍼 시작
[root@localhost bin]# ./zookeeper-server-start.sh /home/aws/Downloads/kafka_2.11-0.10.0.0/config/zookeeper.properties
[2022-01-14 09:28:17,339] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,340] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,348] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2022-01-14 09:28:17,348] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2022-01-14 09:28:17,348] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2022-01-14 09:28:17,348] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2022-01-14 09:28:17,356] INFO Log4j found with jmx enabled. (org.apache.zookeeper.jmx.ManagedUtil)
[2022-01-14 09:28:17,379] INFO Reading configuration from: /home/aws/Downloads/kafka_2.11-0.10.0.0/config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,379] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,379] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-01-14 09:28:17,379] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2022-01-14 09:28:17,381] INFO zookeeper.snapshot.trust.empty : false (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2022-01-14 09:28:17,395] INFO Server environment:zookeeper.version=3.5.7-f0fdd52973d373ffd9c86b81d99842dc2c7f660e, built on 02/10/2020 11:30 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,395] INFO Server environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,395] INFO Server environment:java.version=1.8.0_302 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,395] INFO Server environment:java.vendor=Red Hat, Inc. (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.302.b08-0.el7_9.x86_64/jre (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.class.path=/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/activation-1.1.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/aopalliance-repackaged-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/argparse4j-0.7.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/audience-annotations-0.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/commons-cli-1.4.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/commons-lang3-3.8.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-api-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-basic-auth-extension-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-file-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-json-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-mirror-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-mirror-client-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-runtime-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/connect-transforms-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/hk2-api-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/hk2-locator-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/hk2-utils-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-annotations-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-core-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-databind-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-dataformat-csv-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-datatype-jdk8-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-jaxrs-base-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-jaxrs-json-provider-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-module-jaxb-annotations-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-module-paranamer-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jackson-module-scala_2.12-2.10.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.activation-api-1.2.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.annotation-api-1.3.4.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.inject-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/javassist-3.22.0-CR2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/javassist-3.26.0-GA.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/javax.servlet-api-3.1.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/javax.ws.rs-api-2.1.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jaxb-api-2.3.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-client-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-common-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-container-servlet-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-container-servlet-core-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-hk2-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-media-jaxb-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jersey-server-2.28.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-client-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-continuation-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-http-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-io-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-security-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-server-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-servlet-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-servlets-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jetty-util-9.4.24.v20191120.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/jopt-simple-5.0.4.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka_2.12-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka_2.12-2.5.0-sources.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-clients-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-log4j-appender-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-streams-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-streams-examples-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-streams-scala_2.12-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-streams-test-utils-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/kafka-tools-2.5.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/log4j-1.2.17.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/lz4-java-1.7.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/maven-artifact-3.6.3.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/metrics-core-2.2.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-buffer-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-codec-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-common-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-handler-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-resolver-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-transport-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-transport-native-epoll-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/netty-transport-native-unix-common-4.1.45.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/paranamer-2.8.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/plexus-utils-3.2.1.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/reflections-0.9.12.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/rocksdbjni-5.18.3.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-collection-compat_2.12-2.1.3.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-java8-compat_2.12-0.9.0.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-library-2.12.10.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-logging_2.12-3.9.2.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/scala-reflect-2.12.10.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/slf4j-api-1.7.30.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/slf4j-log4j12-1.7.30.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/snappy-java-1.1.7.3.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/validation-api-2.0.1.Final.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/zookeeper-3.5.7.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/zookeeper-jute-3.5.7.jar:/home/aws/Downloads/kafka_2.12-2.5.0/bin/../libs/zstd-jni-1.4.4-7.jar (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:os.version=3.10.0-1127.el7.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:user.name=root (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:user.home=/root (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:user.dir=/home/aws/Downloads/kafka_2.12-2.5.0/bin (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,396] INFO Server environment:os.memory.free=497MB (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,397] INFO Server environment:os.memory.max=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,397] INFO Server environment:os.memory.total=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,398] INFO minSessionTimeout set to 6000 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,398] INFO maxSessionTimeout set to 60000 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,398] INFO Created server with tickTime 3000 minSessionTimeout 6000 maxSessionTimeout 60000 datadir /tmp/zookeeper/version-2 snapdir /tmp/zookeeper/version-2 (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-14 09:28:17,420] INFO Logging initialized @467ms to org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log)
[2022-01-14 09:28:17,562] WARN o.e.j.s.ServletContextHandler@33f88ab{/,null,UNAVAILABLE} contextPath ends with /* (org.eclipse.jetty.server.handler.ContextHandler)
[2022-01-14 09:28:17,562] WARN Empty contextPath (org.eclipse.jetty.server.handler.ContextHandler)
[2022-01-14 09:28:17,578] INFO jetty-9.4.24.v20191120; built: 2019-11-20T21:37:49.771Z; git: 363d5f2df3a8a28de40604320230664b9c793c16; jvm 1.8.0_302-b08 (org.eclipse.jetty.server.Server)
[2022-01-14 09:28:17,660] INFO DefaultSessionIdManager workerName=node0 (org.eclipse.jetty.server.session)
[2022-01-14 09:28:17,661] INFO No SessionScavenger set, using defaults (org.eclipse.jetty.server.session)
[2022-01-14 09:28:17,663] INFO node0 Scavenging every 660000ms (org.eclipse.jetty.server.session)
[2022-01-14 09:28:17,672] INFO Started o.e.j.s.ServletContextHandler@33f88ab{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler)
[2022-01-14 09:28:17,685] INFO Started ServerConnector@52a86356{HTTP/1.1,[http/1.1]}{0.0.0.0:8080} (org.eclipse.jetty.server.AbstractConnector)
[2022-01-14 09:28:17,686] INFO Started @733ms (org.eclipse.jetty.server.Server)
[2022-01-14 09:28:17,686] INFO Started AdminServer on address 0.0.0.0, port 8080 and command URL /commands (org.apache.zookeeper.server.admin.JettyAdminServer)
[2022-01-14 09:28:17,689] INFO Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory (org.apache.zookeeper.server.ServerCnxnFactory)
[2022-01-14 09:28:17,693] INFO Configuring NIO connection handler with 10s sessionless connection timeout, 1 selector thread(s), 4 worker threads, and 64 kB direct buffers. (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2022-01-14 09:28:17,695] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2022-01-14 09:28:17,709] INFO zookeeper.snapshotSizeFactor = 0.33 (org.apache.zookeeper.server.ZKDatabase)
[2022-01-14 09:28:17,712] INFO Reading snapshot /tmp/zookeeper/version-2/snapshot.10f (org.apache.zookeeper.server.persistence.FileSnap)
[2022-01-14 09:28:17,742] INFO Snapshotting: 0x134 to /tmp/zookeeper/version-2/snapshot.134 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2022-01-14 09:28:17,777] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)
카프카 서버 시작
[root@localhost bin]# ./kafka-server-start.sh /home/aws/Downloads/kafka_2.11-0.10.0.0/config/server.properties &
소비자 프로그램을 빌드하고 실행
[root@localhost Beam-examples-master]# mvn compile exec:java -Dexec.mainClass=com.sunil.WindowedWordCount -Pdirect-runner -Dexec.args="--output=/home/aws/Downloads/output/"
[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.sunil:beam-examples:jar:0.1
[WARNING] 'build.plugins.plugin.(groupId:artifactId)' must be unique but found duplicate declaration of plugin org.apache.maven.plugins:maven-jar-plugin @ line 124, column 21
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO]
[INFO] ----------------------< com.sunil:beam-examples >-----------------------
[INFO] Building beam-examples 0.1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ beam-examples ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/aws/Downloads/Beam-examples-master/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.7.0:compile (default-compile) @ beam-examples ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- exec-maven-plugin:1.6.0:java (default-cli) @ beam-examples ---
Oct 24, 2021 8:31:39 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [192.168.17.133:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = null
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Oct 24, 2021 8:31:39 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version: 2.4.0
Oct 24, 2021 8:31:39 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId: 77a89fcf8d7fa018
Oct 24, 2021 8:31:39 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka startTimeMs: 1635078699859
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.Metadata update
INFO: [Consumer clientId=consumer-1, groupId=null] Cluster ID: NVfRzZzESxOcPYrXHstbLg
Oct 24, 2021 8:31:40 AM org.apache.beam.sdk.io.kafka.KafkaUnboundedSource split
INFO: Partitions assigned to split 0 (total 1): messenger-0
Oct 24, 2021 8:31:40 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [192.168.17.133:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = null
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Oct 24, 2021 8:31:40 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version: 2.4.0
Oct 24, 2021 8:31:40 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId: 77a89fcf8d7fa018
Oct 24, 2021 8:31:40 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka startTimeMs: 1635078700243
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.consumer.KafkaConsumer assign
INFO: [Consumer clientId=consumer-2, groupId=null] Subscribed to partition(s): messenger-0
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.Metadata update
INFO: [Consumer clientId=consumer-2, groupId=null] Cluster ID: NVfRzZzESxOcPYrXHstbLg
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-2, groupId=null] Resetting offset for partition messenger-0 to offset 0.
Oct 24, 2021 8:31:40 AM org.apache.beam.sdk.io.kafka.KafkaUnboundedReader start
INFO: Reader-0: reading from messenger-0 starting at offset 0
Oct 24, 2021 8:31:40 AM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [192.168.17.133:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = Reader-0_offset_consumer_1217782153_none
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Oct 24, 2021 8:31:40 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version: 2.4.0
Oct 24, 2021 8:31:40 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId: 77a89fcf8d7fa018
Oct 24, 2021 8:31:40 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka startTimeMs: 1635078700588
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.consumer.KafkaConsumer assign
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Subscribed to partition(s): messenger-0
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Seeking to LATEST offset of partition messenger-0
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.Metadata update
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Cluster ID: NVfRzZzESxOcPYrXHstbLg
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Resetting offset for partition messenger-0 to offset 0.
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Seeking to LATEST offset of partition messenger-0
Oct 24, 2021 8:31:40 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Resetting offset for partition messenger-0 to offset 0.
Oct 24, 2021 8:31:41 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Seeking to LATEST offset of partition messenger-0
Oct 24, 2021 8:31:41 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Resetting offset for partition messenger-0 to offset 0.
Oct 24, 2021 8:31:42 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1217782153_none-3, groupId=Reader-0_offset_consumer_1217782153_none] Seeking to LATEST offset of partition messenger-0
생산자 프로그램을 수동으로 실행
[root@localhost scripts]# python3 GenMessage.py krafton
{"name":"krafton","message": "This message is from krafton","ets": 1642173571124}
>>{"name":"krafton","message": "This message is from krafton","ets": 1642173571124}
message sent!
소비자 프로세스를 수동으로 실행
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.17.133:9092 --topic messenger
{"name":"philipjung","message": "This message is from philipjung","ets": 1635079382057}
{"name":"krafton","message": "This message is from krafton","ets": 1635079399238}
{"name":"krafton","message": "This message is from krafton","ets": 1642173571124}
소비자 프로세스를 중지한다. 이유는 생산자 프로그램을 테스트하기 위해서이다.
생산자 프로그램을 수동으로 실행한다.
[root@localhost scripts]# python3 GenMessage.py philipjung1234
{"name":"philipjung1234","message": "This message is from philipjung1234","ets": 1642173830085}
>>{"name":"philipjung1234","message": "This message is from philipjung1234","ets": 1642173830085}
message sent!
소비자 프로그램이 토픽에서 읽고, 메세지를 출력한다
Jan 14, 2022 10:23:51 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Resetting offset for partition messenger-0 to offset 6.
arg1: {"name":"philipjung1234","message": "This message is from philipjung1234","ets": 1642173830085}arg0: messenger
Jan 14, 2022 10:23:52 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Seeking to LATEST offset of partition messenger-0
Jan 14, 2022 10:23:52 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Resetting offset for partition messenger-0 to offset 7.
Jan 14, 2022 10:23:53 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Seeking to LATEST offset of partition messenger-0
Jan 14, 2022 10:23:53 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Resetting offset for partition messenger-0 to offset 7.
Jan 14, 2022 10:23:54 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Seeking to LATEST offset of partition messenger-0
Jan 14, 2022 10:23:54 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Resetting offset for partition messenger-0 to offset 7.
Jan 14, 2022 10:23:55 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Seeking to LATEST offset of partition messenger-0
Jan 14, 2022 10:23:55 AM org.apache.kafka.clients.consumer.internals.SubscriptionState maybeSeekUnvalidated
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Resetting offset for partition messenger-0 to offset 7.
Jan 14, 2022 10:23:56 AM org.apache.kafka.clients.consumer.internals.SubscriptionState lambda$requestOffsetReset$3
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_1408619537_none-3, groupId=Reader-0_offset_consumer_1408619537_none] Seeking to LATEST offset of partition messenger-0
읽은 내용은 파일에 쓴다.
[aws@localhost Downloads]$ ll
total 292188
drwxr-xr-x. 6 root root 99 Oct 24 07:25 apache-maven-3.6.3
-rw-r--r--. 1 root root 9506321 Jul 3 2020 apache-maven-3.6.3-bin.tar.gz
drwxr-xr-x. 6 aws aws 134 Oct 24 07:49 Beam-examples-master
-rw-rw-r--. 1 aws aws 91647 Oct 24 07:46 Beam-examples-master.zip
drwxr-xr-x. 8 aws aws 119 Oct 24 06:14 kafka_2.11-0.10.0.0
drwxr-xr-x. 7 aws aws 101 Oct 24 07:52 kafka_2.12-2.5.0
-rw-rw-r--. 1 aws aws 61604633 Oct 24 06:19 kafka_2.12-2.5.0.tgz
-rw-rw-r--. 1 aws aws 9352 Oct 24 05:15 kafka-beam-example-master.zip
-rw-r--r--. 1 root root 39 Jan 14 11:05 output-2022-01-14 08:04:00.000-2022-01-14 08:05:00.000-0-of-1
drwxr-xr-x. 13 aws aws 211 May 1 2019 spark-2.4.3-bin-hadoop2.6
-rwxrw-rw-. 1 aws aws 227973463 Aug 17 08:36 spark-2.4.3-bin-hadoop2.6.tgz
[aws@localhost Downloads]$