- Expert AWS Development
- Atul V. Mistry
- 522字
- 2025-02-24 00:33:26
Amazon Kinesis Firehose
Amazon Kinesis Firehose is a fully managed, highly available and durable service to load real-time streaming data easily into AWS services such as Amazon S3, Amazon Redshift, or Amazon Elasticsearch. It replicates your data synchronously at three different facilities. It will automatically scale as per throughput data. You can compress your data into different formats and also encrypt it before loading.
AWS SDK for Java, Node.js, Python, .NET, and Ruby can be used to send data to a Kinesis Firehose stream using the Kinesis Firehose API.
The Kinesis Firehose API provides two operations to send data to the Kinesis Firehose delivery stream:
- PutRecord: In one call, it will send one record
- PutRecordBatch: In one call, it will send multiple data records
Let's explore an example using PutRecord. In this example, the MyFirehoseStream stream has been created. Here you can use Eclipse IDE for the example.
You need to import a few classes such as AmazonKinesisFirehoseClient, which will help to create the client for accessing Firehose. PutRecordRequest and PutRecordResult will help to put the stream record request and its result:
private static AmazonKinesisFirehoseClient client;
AmazonKinesisFirehoseClient will create the instance firehoseClient. You have to assign the credentials and region to this instance:
String data = "My Kinesis Firehose data";
String myFirehoseStream = "MyFirehoseStream";
Record record = new Record();
record.setData(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)));
As mentioned earlier, myFirehoseStream has already been created.
A record in the delivery stream is a unit of data. In the setData method, we are passing a data blob. It is base-64 encoded. Before sending a request to the AWS service, Java will perform base-64 encoding on this field.
A returned ByteBuffer is mutable. If you change the content of this byte buffer then it will reflect to all objects that have a reference to it. It's always best practice to call ByteBuffer.duplicate() or ByteBuffer.asReadOnlyBuffer() before reading from the buffer or using it.
Now you have to mention the name of the delivery stream and the data records you want to create the PutRecordRequest instance:
PutRecordRequest putRecordRequest = new PutRecordRequest()
.withDeliveryStreamName(myFirehoseStream)
.withRecord(record);
putRecordRequest.setRecord(record);
PutRecordResult putRecordResult = client.putRecord(putRecordRequest);
System.out.println("Put Request Record ID: " + putRecordResult.getRecordId());
putRecordResult will write a single record into the delivery stream by passing the putRecordRequest and get the result and print the RecordID:
PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest().withDeliveryStreamName("MyFirehoseStream")
.withRecords(getBatchRecords());
You have to mention the name of the delivery stream and the data records you want to create the PutRecordBatchRequest instance. The getBatchRecord method has been created to pass multiple records as mentioned in the next step:
JSONObject jsonObject = new JSONObject();
jsonObject.put("userid", "userid_1");
jsonObject.put("password", "password1");
Record record = new Record().withData(ByteBuffer.wrap(jsonObject.toString().getBytes()));
records.add(record);
In the getBatchRecord method, you will create the jsonObject and put data into this jsonObject . You will pass jsonObject to create the record. These records add to a list of records and return it:
PutRecordBatchResult putRecordBatchResult = client.putRecordBatch(putRecordBatchRequest);
for(int i=0;i<putRecordBatchResult.getRequestResponses().size();i++){
System.out.println("Put Batch Request Record ID :"+i+": " + putRecordBatchResult.getRequestResponses().get(i).getRecordId());
}
putRecordBatchResult will write multiple records into the delivery stream by passing the putRecordBatchRequest, get the result, and print the RecordID. You will see the output like the following screen:
