This is a Flume Sink implementation that can publish data to a Pulsar topic
This sink is developed and tested using Apache Flume NG 1.9.0 and Apache Pulsar Client 2.3.0.
$ git clone https://github.com/streamnative/flume-ng-pulsar-sink.git
docker pull apachepulsar/pulsar:2.3.0
docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-flume-standalone apachepulsar/pulsar:2.3.0 bin/pulsar standalone
Start a consumer to consume messages from topic flume-test-topic
.
docker cp src/test/python/pulsar-flume.py pulsar-flume-standalone:/pulsar
docker exec -it pulsar-flume-standalone /bin/bash
python pulsar-flume.py
Open a new terminal to start a docker instance flume
of maven:3.6-jdk-8
in the same network as pulsar-flume-standalone
we started at previous step. We will use this flume
docker instace to install Flume and Flume-Ng-Pulsar-Sink.
docker pull maven:3.6-jdk-8
docker run -d -it --link pulsar-flume-standalone -p 44445:44445 --name flume maven:3.6-jdk-8 /bin/bash
Go to the docker instance flume
docker exec -it flume /bin/bash
At flume
instance:
wget http://apache.01link.hk/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
tar -zxvf apache-flume-1.9.0-bin.tar.gz
At flume
instance:
git clone https://github.com/streamnative/flume-ng-pulsar-sink
cd flume-ng-pulsar-sink
mvn clean package
cd ..
cp flume-ng-pulsar-sink/target/flume-ng-pulsar-sink-1.9.0.jar apache-flume-1.9.0-bin/lib/
exit
Copy the example configurations to flume
:
docker cp src/test/resources/flume-example.conf flume:/apache-flume-1.9.0-bin/conf/
docker cp src/test/resources/flume-env.sh flume:/apache-flume-1.9.0-bin/conf/
docker exec -it flume /bin/bash
At flume
instance:
apache-flume-1.9.0-bin/bin/flume-ng agent --conf apache-flume-1.9.0-bin/conf/ -f apache-flume-1.9.0-bin/conf/flume-example.conf -n a1
Open another terminal, send data to port 44445 of flume
➜ ~ telnet localhost 44445
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hello
OK
world
OK
At the terminal running pulsar-consumer.py
, you will see following output:
'eceived message: 'hello
'eceived message: 'world
flume
and pulsar-flume-standalone
are running at background. Please remember to kill them at the end of this tutorial.
$ docker ps | grep pulsar-flume-standalone | awk '{ print $1 }' | xargs docker kill
$ docker ps | grep flume | awk '{ print $1 }' | xargs docker kill
- JDK 1.8+
- Apache Maven 3.x
Clone the project from Github:
$ git clone https://github.com/streamnative/flume-ng-pulsar-sink.git
Building the Flume Ng Sink using maven:
$ cd flume-ng-pulsar-sink
$ mvn clean package
Once it is built successfully, you will find a jar flume-ng-pulsar-sink-<version>.jar
generated under target
directory.
You can drop the built jar at your flume installation under lib
directory.
Name | Description | Default |
---|---|---|
useAvroEventFormat | Whether use avro format for event | false |
syncMode | Mode of send data to pulsar | true |
Name | Description | Default |
---|---|---|
serviceUrl | Whether non-persistent topics are enabled on the broker | localhost:6650 |
authPluginClassName | name of the Authentication-Plugin you want to use | "" |
authParamsString | string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2" | "" |
tlsCertFile | path of tls cert file | "" |
tlsKeyFile | path of tls key file | "" |
useTLS | Whether to turn on TLS, if to start, use protocol pulsar+ssl | false |
operationTimeout | Set the operation timeout (default: 30 seconds) | 30s |
numIoThreads | Set the number of threads to be used for handling connections to brokers | 1 |
numListenerThreads | Set the number of threads to be used for message listeners | 1 |
connectionsPerBroker | Sets the max number of connection that the client library will open to a single broker. | 1 |
enableTcpNoDelay | Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm. | false |
tlsTrustCertsFilePath | Set the path to the trusted TLS certificate file | false |
allowTlsInsecureConnection | Configure whether the Pulsar client accept untrusted TLS certificate from broker | false |
enableTlsHostnameVerification | whether to enable TLS hostname verification | false |
statsInterval | the interval between each stat info | 60 |
maxConcurrentLookupRequests | Number of concurrent lookup-requests allowed to send on each broker-connection to prevent overload on broker. | 60 |
maxLookupRequests | Number of max lookup-requests allowed on each broker-connection to prevent overload on broker. | 60 |
maxNumberOfRejectedRequestPerConnection | Set max number of broker-rejected requests in a certain time-frame (30 seconds) after which current connection will be closed and client creates a new connection that give chance to connect a different broker | 50 |
keepAliveIntervalSeconds | Set keep alive interval in seconds for each client-broker-connection. | 30 |
connectionTimeout | Set the duration of time to wait for a connection to a broker to be established. | 30 |
Name | Description | Default |
---|---|---|
topicName | Specify the topic this producer will be publishing on. | "" |
producerName | Specify a name for the producer | "" |
sendTimeout | Set the send timeout | 30s |
blockIfQueueFull | Set whether the send and sendAsync operations should block when the outgoing message queue is full. | false |
enableBatching | Control whether automatic batching of messages is enabled for the producer | true |
batchMessagesMaxMessagesPerBatch | maximum number of messages in a batch | 1000 |
batchDelay | the batch delay | 1ms |
messageRoutingMode | the message routing mode, SinglePartition,RoundRobinPartition, CustomPartition(0,1,2) | 1 |
hashingSchema | JavaStringHash,Murmur3_32Hash(0,1) | 0 |
compressionType | NONE,LZ4,ZLIB,ZSTD(0,1,2,3) | 0 |
This project is licensed under the Apache License 2.0.