What Is Kinesis?
Amazon Kinesis Data Streams is a managed service for real-time data streaming. You produce records to a stream, and one or more consumers process them in order. Think of it as a durable, ordered log. Similar in concept to Kafka, but fully managed.
Records stay in the stream for 24 hours (configurable up to 365 days), so multiple consumers can process the same data independently at different speeds.
When to Use It from .NET
- Real-time event processing (clickstreams, IoT telemetry, transaction logs)
- Decoupling high-volume producers from slower consumers
- Ordered processing within a partition (shard)
- Fan-out: multiple consumers independently reading the same stream
If you just need to dequeue messages one at a time without ordering guarantees, SQS is simpler. If you need content-based routing, EventBridge is a better fit. Kinesis is for when order matters and volume is high.
Producing Records
using Amazon.Kinesis;
using Amazon.Kinesis.Model;
using System.Text;
using System.Text.Json;
var client = new AmazonKinesisClient();
var order = new OrderEvent("ORD-123", "CUST-456", 250.00m);
await client.PutRecordAsync(new PutRecordRequest
{
StreamName = "order-events",
PartitionKey = order.CustomerId, // determines shard placement
Data = new MemoryStream(
JsonSerializer.SerializeToUtf8Bytes(order, AppJsonContext.Default.OrderEvent)
),
});
Batch producing
var records = orders.Select(o => new PutRecordsRequestEntry
{
PartitionKey = o.CustomerId,
Data = new MemoryStream(
JsonSerializer.SerializeToUtf8Bytes(o, AppJsonContext.Default.OrderEvent)
),
}).ToList();
var response = await client.PutRecordsAsync(new PutRecordsRequest
{
StreamName = "order-events",
Records = records,
});
// Handle failures
if (response.FailedRecordCount > 0)
{
for (int i = 0; i < response.Records.Count; i++)
{
if (!string.IsNullOrEmpty(response.Records[i].ErrorCode))
{
// Retry records[i]
}
}
}
Note the use of System.Text.Json source-generated context (AppJsonContext.Default.OrderEvent) instead of reflection-based serialization. This is AoT compatible.
Consuming with Lambda
The most common pattern for .NET on AWS is using Lambda as a Kinesis consumer. Lambda polls the stream for you and invokes your function with batches of records:
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using System.Text.Json;
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.SourceGeneratorLambdaJsonSerializer<AppJsonContext>))]
public class KinesisProcessor
{
public async Task Handler(KinesisEvent kinesisEvent, ILambdaContext context)
{
foreach (var record in kinesisEvent.Records)
{
var data = record.Kinesis.Data;
using var reader = new StreamReader(data);
var json = await reader.ReadToEndAsync();
var order = JsonSerializer.Deserialize(json, AppJsonContext.Default.OrderEvent);
context.Logger.LogInformation($"Processing order {order.OrderId}");
await ProcessOrder(order);
}
}
}
Error handling
By default, if your Lambda throws on any record, the entire batch retries from the beginning. This can cause poison-pill scenarios where one bad record blocks the shard.
Configure these in CDK to handle it:
bisectBatchOnError: splits the failing batch in half and retries each halfmaxRetryAttempts: limits retries before moving to a DLQonFailure: sends failed records to SQS or SNS
CDK Setup (C#)
using Amazon.CDK;
using Amazon.CDK.AWS.Kinesis;
using Amazon.CDK.AWS.Lambda;
using Amazon.CDK.AWS.Lambda.EventSources;
// Create the stream
var stream = new Stream(this, "OrderEvents", new StreamProps
{
StreamName = "order-events",
ShardCount = 2, // start with 2, scale up as needed
RetentionPeriod = Duration.Hours(48),
});
// Lambda consumer
var processor = new Function(this, "KinesisProcessor", new FunctionProps
{
Runtime = Runtime.DOTNET_8,
Handler = "MyApp::MyApp.KinesisProcessor::Handler",
Code = Code.FromAsset("./src/KinesisProcessor/publish"),
MemorySize = 512,
Timeout = Duration.Minutes(1),
});
// Wire up the event source
processor.AddEventSource(new KinesisEventSource(stream, new KinesisEventSourceProps
{
StartingPosition = StartingPosition.TRIM_HORIZON,
BatchSize = 100,
MaxBatchingWindow = Duration.Seconds(5),
BisectBatchOnError = true,
RetryAttempts = 3,
OnFailure = new SqsDestination(deadLetterQueue),
ParallelizationFactor = 2,
}));
// Grant the producer access
stream.GrantWrite(producerFunction);
Design Considerations
Shard count and partition keys
Each shard handles 1 MB/s writes, 2 MB/s reads. Choose a partition key that distributes evenly across shards. Customer ID works well if you have many customers. A single high-volume customer will hot-spot one shard.
Enhanced fan-out
Standard consumers share the 2 MB/s read throughput per shard. If you have multiple consumers and need dedicated throughput, use enhanced fan-out (each consumer gets its own 2 MB/s pipe). Costs more but eliminates read contention.
Ordering guarantees
Records with the same partition key arrive in order within a shard. If strict ordering across all records matters, you need a single shard (which limits throughput). Usually you only need ordering per-entity (per customer, per device), which partition keys handle naturally.
Further Reading
Looking for hands-on help? View my .NET on AWS services β