Hello World

Be Happy!

ruby-kafka with docker-compose kafka


Setup ruby-kafka

  1. Gem install
    • sudo gem install ruby-kafka 
      Password: 
      Fetching ruby-kafka-1.3.0.gem 
      Fetching digest-crc-0.6.3.gem 
      Building native extensions. This could take a while... 
      Successfully installed digest-crc-0.6.3 
      Successfully installed ruby-kafka-1.3.0 
      Parsing documentation for digest-crc-0.6.3 
      Installing ri documentation for digest-crc-0.6.3 
      Parsing documentation for ruby-kafka-1.3.0 
      Installing ri documentation for ruby-kafka-1.3.0 
      Done installing documentation for digest-crc, ruby-kafka after 2 seconds 
      2 gems installed 
  2.  require gem
    • $ irb
      require "kafka" 

Kafka

  1. download
    • git clone https://github.com/simplesteph/kafka-stack-docker-compose
  2. Run
    • cd ./kafka-stack-docker-compose
      docker-compose -f zk-single-kafka-single.yml up

Consumer

  1. Create Kafka instance
    • kafka = Kafka.new(["127.0.0.1:9092"], client_id: "my-application")
  2. Receive message with loop
    • kafka.each_message(topic: "greetings") do |message|
        puts message.offset, message.key, message.value
      end

irb(main):006:0> require "kafka"
irb(main):007:0> kafka = Kafka.new(["127.0.0.1:9092"]) 
=> #<Kafka::Client:0x00007fc10a0e67a8 @logger=#<Logger:0x00007fc10a0e5e70 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007fc10a0e5bc8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x00007fc10a0e5a38 @default_payload={:client_id=>"ruby-kafka"}, @backend=nil>, @seed_brokers=[#<URI::Generic kafka://127.0.0.1:9092>], @connection_builder=#<Kafka::ConnectionBuilder:0x00007fc10a11dc30 @client_id="ruby-kafka", @logger=#<Logger:0x00007fc10a0e5e70 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007fc10a0e5bc8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x00007fc10a0e5a38 @default_payload={:client_id=>"ruby-kafka"}, @backend=nil>, @connect_timeout=nil, @socket_timeout=nil, @ssl_context=nil, @sasl_authenticator=#<Kafka::SaslAuthenticator:0x00007fc10a11eec8 @logger=#<Logger:0x00007fc10a0e5e70 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007fc10a0e5bc8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @plain=#<Kafka::Sasl::Plain:0x00007fc10a11e860 @logger=#<Logger:0x00007fc10a0e5e70 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007fc10a0e5bc8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @authzid="", @username=nil, @password=nil>, @gssapi=#<Kafka::Sasl::Gssapi:0x00007fc10a11dfa0 @logger=#<Logger:0x00007fc10a0e5e70 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007fc10a0e5bc8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @principal=nil, @keytab=nil>, @scram=#<Kafka::Sasl::Scram:0x00007fc10a11deb0 @semaphore=#<Thread::Mutex:0x00007fc10a11de60>, @username=nil, @password=nil, @logger=#<Logger:0x00007fc10a0e5e70 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007fc10a0e5bc8 @datetime_format=nil>, @formatter=nil, @logdev=nil>>, @oauth=#<Kafka::Sasl::OAuth:0x00007fc10a11ddc0 @logger=#<Logger:0x00007fc10a0e5e70 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007fc10a0e5bc8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @token_provider=nil>, @mechanism=nil>>, @cluster=#<Kafka::Cluster:0x00007fc10a11d9b0 @logger=#<Logger:0x00007fc10a0e5e70 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007fc10a0e5bc8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @seed_brokers=[#<URI::Generic kafka://127.0.0.1:9092>], @broker_pool=#<Kafka::BrokerPool:0x00007fc10a11daf0 @logger=#<Logger:0x00007fc10a0e5e70 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007fc10a0e5bc8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @connection_builder=#<Kafka::ConnectionBuilder:0x00007fc10a11dc30 @client_id="ruby-kafka", @logger=#<Logger:0x00007fc10a0e5e70 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007fc10a0e5bc8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x00007fc10a0e5a38 @default_payload={:client_id=>"ruby-kafka"}, @backend=nil>, @connect_timeout=nil, @socket_timeout=nil, @ssl_context=nil, @sasl_authenticator=#<Kafka::SaslAuthenticator:0x00007fc10a11eec8 @logger=#<Logger:0x00007fc10a0e5e70 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007fc10a0e5bc8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @plain=#<Kafka::Sasl::Plain:0x00007fc10a11e860 @logger=#<Logger:0x00007fc10a0e5e70 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007fc10a0e5bc8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @authzid="", @username=nil, @password=nil>, @gssapi=#<Kafka::Sasl::Gssapi:0x00007fc10a11dfa0 @logger=#<Logger:0x00007fc10a0e5e70 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007fc10a0e5bc8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @principal=nil, @keytab=nil>, @scram=#<Kafka::Sasl::Scram:0x00007fc10a11deb0 @semaphore=#<Thread::Mutex:0x00007fc10a11de60>, @username=nil, @password=nil, @logger=#<Logger:0x00007fc10a0e5e70 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007fc10a0e5bc8 @datetime_format=nil>, @formatter=nil, @logdev=nil>>, @oauth=#<Kafka::Sasl::OAuth:0x00007fc10a11ddc0 @logger=#<Logger:0x00007fc10a0e5e70 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007fc10a0e5bc8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @token_provider=nil>, @mechanism=nil>>, @brokers={}>, @cluster_info=nil, @stale=true, @target_topics=#<Set: {}>>, @partitioner=#<Kafka::Partitioner:0x00007fc10a11d848>> 
irb(main):009:0> kafka.each_message(topic: "greetings") do |message|
irb(main):010:1*  puts message.offset, message.key, message.value
irb(main):011:1> end
0

Hello, World!a111aaaa

Producer

  1. Create Kafka instance 
    • kafka = Kafka.new(["127.0.0.1:9092"], client_id: "my-application")
  2. Send message
    • kafka.deliver_message("Hello, World!a111aaaa", topic: "greetings")

irb(main):056:0> require "kafka" 
irb(main):057:0> kafka = Kafka.new(["127.0.0.1:9092"], client_id: "my-application")
=> #<Kafka::Client:0x00007f7fc01855a0 @logger=#<Logger:0x00007f7fc0185438 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f7fc01853e8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x00007f7fc0185398 @default_payload={:client_id=>"my-application"}, @backend=nil>, @seed_brokers=[#<URI::Generic kafka://127.0.0.1:9092>], @connection_builder=#<Kafka::ConnectionBuilder:0x00007f7fc0184c40 @client_id="my-application", @logger=#<Logger:0x00007f7fc0185438 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f7fc01853e8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x00007f7fc0185398 @default_payload={:client_id=>"my-application"}, @backend=nil>, @connect_timeout=nil, @socket_timeout=nil, @ssl_context=nil, @sasl_authenticator=#<Kafka::SaslAuthenticator:0x00007f7fc01850f0 @logger=#<Logger:0x00007f7fc0185438 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f7fc01853e8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @plain=#<Kafka::Sasl::Plain:0x00007f7fc0185050 @logger=#<Logger:0x00007f7fc0185438 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f7fc01853e8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @authzid="", @username=nil, @password=nil>, @gssapi=#<Kafka::Sasl::Gssapi:0x00007f7fc0184fd8 @logger=#<Logger:0x00007f7fc0185438 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f7fc01853e8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @principal=nil, @keytab=nil>, @scram=#<Kafka::Sasl::Scram:0x00007f7fc0184f60 @semaphore=#<Thread::Mutex:0x00007f7fc0184f10>, @username=nil, @password=nil, @logger=#<Logger:0x00007f7fc0185438 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f7fc01853e8 @datetime_format=nil>, @formatter=nil, @logdev=nil>>, @oauth=#<Kafka::Sasl::OAuth:0x00007f7fc0184e98 @logger=#<Logger:0x00007f7fc0185438 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f7fc01853e8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @token_provider=nil>, @mechanism=nil>>, @cluster=#<Kafka::Cluster:0x00007f7fc0184ad8 @logger=#<Logger:0x00007f7fc0185438 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f7fc01853e8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @seed_brokers=[#<URI::Generic kafka://127.0.0.1:9092>], @broker_pool=#<Kafka::BrokerPool:0x00007f7fc0184bc8 @logger=#<Logger:0x00007f7fc0185438 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f7fc01853e8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @connection_builder=#<Kafka::ConnectionBuilder:0x00007f7fc0184c40 @client_id="my-application", @logger=#<Logger:0x00007f7fc0185438 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f7fc01853e8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x00007f7fc0185398 @default_payload={:client_id=>"my-application"}, @backend=nil>, @connect_timeout=nil, @socket_timeout=nil, @ssl_context=nil, @sasl_authenticator=#<Kafka::SaslAuthenticator:0x00007f7fc01850f0 @logger=#<Logger:0x00007f7fc0185438 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f7fc01853e8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @plain=#<Kafka::Sasl::Plain:0x00007f7fc0185050 @logger=#<Logger:0x00007f7fc0185438 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f7fc01853e8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @authzid="", @username=nil, @password=nil>, @gssapi=#<Kafka::Sasl::Gssapi:0x00007f7fc0184fd8 @logger=#<Logger:0x00007f7fc0185438 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f7fc01853e8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @principal=nil, @keytab=nil>, @scram=#<Kafka::Sasl::Scram:0x00007f7fc0184f60 @semaphore=#<Thread::Mutex:0x00007f7fc0184f10>, @username=nil, @password=nil, @logger=#<Logger:0x00007f7fc0185438 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f7fc01853e8 @datetime_format=nil>, @formatter=nil, @logdev=nil>>, @oauth=#<Kafka::Sasl::OAuth:0x00007f7fc0184e98 @logger=#<Logger:0x00007f7fc0185438 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f7fc01853e8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @token_provider=nil>, @mechanism=nil>>, @brokers={}>, @cluster_info=nil, @stale=true, @target_topics=#<Set: {}>>, @partitioner=#<Kafka::Partitioner:0x00007f7fc01849e8>>
irb(main):058:0> kafka.deliver_message("Hello, World!a111aaaa", topic: "greetings")
=> nil

#docker-compose (9) #ruby (13) #docker (30) #kafka (1)
List