Monday, 20 April 2020

Flume Demo: Sink as HDFS


In this blog post, I will explain how to run a flume agent with sink as HDFS.

Step 1: 

Create a  agent conf file "example-hdfs.conf" as below with agent name as "a1" and sink as "/user/manoj/flume/". Make sure the directory "/user/manoj/flume/" is available in HDFS. The channel is memory.

The conf file "example-hdfs.conf" looks as below. The channel type and HDFS location is highlighted below:

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = hdfs

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Customizing sink for HDFS

a1.sinks.k1.hdfs.path = /user/manoj/flume
a1.sinks.k1.filePrefix = netcat

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 = c1


Run the flume agent as below:
flume-ng agent --name a1 --conf /home/manoj/flume/conf --conf-file /home/manoj/flume/conf/example-hdfs.conf

Step 3:

Open a telnet connection on port 4444 from another terminal and type something as below:

Step 4:

We could see that this getting written to the HDFS location "/user/manoj/flume". As per the agent configuration file the channel will be memory and sink will be HDFS location.


Inorder to have data stream in plain text format, we need modify the agent conf as below. The highlighted parameter need to be added extra than before.

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = hdfs

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Customizing sink for HDFS

a1.sinks.k1.hdfs.path = /user/manoj/flume
a1.sinks.k1.filePrefix = netcat
a1.sinks.k1.hdfs.fileType = DataStream

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 = c1

Please continue to read the followup posts as well :)

No comments:

Post a Comment

Note: only a member of this blog may post a comment.