Home β€Ί .NET on AWS β€Ί Using Kinesis with .NET

Using Kinesis with .NET

Streaming data with Amazon Kinesis from .NET: producing records, Lambda consumers, and CDK setup.

What Is Kinesis?

Amazon Kinesis Data Streams is a managed service for real-time data streaming. You produce records to a stream, and one or more consumers process them in order. Think of it as a durable, ordered log. Similar in concept to Kafka, but fully managed.

Records stay in the stream for 24 hours (configurable up to 365 days), so multiple consumers can process the same data independently at different speeds.

When to Use It from .NET

  • Real-time event processing (clickstreams, IoT telemetry, transaction logs)
  • Decoupling high-volume producers from slower consumers
  • Ordered processing within a partition (shard)
  • Fan-out: multiple consumers independently reading the same stream

If you just need to dequeue messages one at a time without ordering guarantees, SQS is simpler. If you need content-based routing, EventBridge is a better fit. Kinesis is for when order matters and volume is high.

Producing Records

using Amazon.Kinesis;
using Amazon.Kinesis.Model;
using System.Text;
using System.Text.Json;

var client = new AmazonKinesisClient();

var order = new OrderEvent("ORD-123", "CUST-456", 250.00m);

await client.PutRecordAsync(new PutRecordRequest
{
    StreamName = "order-events",
    PartitionKey = order.CustomerId, // determines shard placement
    Data = new MemoryStream(
        JsonSerializer.SerializeToUtf8Bytes(order, AppJsonContext.Default.OrderEvent)
    ),
});

Batch producing

var records = orders.Select(o => new PutRecordsRequestEntry
{
    PartitionKey = o.CustomerId,
    Data = new MemoryStream(
        JsonSerializer.SerializeToUtf8Bytes(o, AppJsonContext.Default.OrderEvent)
    ),
}).ToList();

var response = await client.PutRecordsAsync(new PutRecordsRequest
{
    StreamName = "order-events",
    Records = records,
});

// Handle failures
if (response.FailedRecordCount > 0)
{
    for (int i = 0; i < response.Records.Count; i++)
    {
        if (!string.IsNullOrEmpty(response.Records[i].ErrorCode))
        {
            // Retry records[i]
        }
    }
}

Note the use of System.Text.Json source-generated context (AppJsonContext.Default.OrderEvent) instead of reflection-based serialization. This is AoT compatible.

Consuming with Lambda

The most common pattern for .NET on AWS is using Lambda as a Kinesis consumer. Lambda polls the stream for you and invokes your function with batches of records:

using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using System.Text.Json;

[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.SourceGeneratorLambdaJsonSerializer<AppJsonContext>))]

public class KinesisProcessor
{
    public async Task Handler(KinesisEvent kinesisEvent, ILambdaContext context)
    {
        foreach (var record in kinesisEvent.Records)
        {
            var data = record.Kinesis.Data;
            using var reader = new StreamReader(data);
            var json = await reader.ReadToEndAsync();
            
            var order = JsonSerializer.Deserialize(json, AppJsonContext.Default.OrderEvent);
            
            context.Logger.LogInformation($"Processing order {order.OrderId}");
            await ProcessOrder(order);
        }
    }
}

Error handling

By default, if your Lambda throws on any record, the entire batch retries from the beginning. This can cause poison-pill scenarios where one bad record blocks the shard.

Configure these in CDK to handle it:

  • bisectBatchOnError: splits the failing batch in half and retries each half
  • maxRetryAttempts: limits retries before moving to a DLQ
  • onFailure: sends failed records to SQS or SNS

CDK Setup (C#)

using Amazon.CDK;
using Amazon.CDK.AWS.Kinesis;
using Amazon.CDK.AWS.Lambda;
using Amazon.CDK.AWS.Lambda.EventSources;

// Create the stream
var stream = new Stream(this, "OrderEvents", new StreamProps
{
    StreamName = "order-events",
    ShardCount = 2, // start with 2, scale up as needed
    RetentionPeriod = Duration.Hours(48),
});

// Lambda consumer
var processor = new Function(this, "KinesisProcessor", new FunctionProps
{
    Runtime = Runtime.DOTNET_8,
    Handler = "MyApp::MyApp.KinesisProcessor::Handler",
    Code = Code.FromAsset("./src/KinesisProcessor/publish"),
    MemorySize = 512,
    Timeout = Duration.Minutes(1),
});

// Wire up the event source
processor.AddEventSource(new KinesisEventSource(stream, new KinesisEventSourceProps
{
    StartingPosition = StartingPosition.TRIM_HORIZON,
    BatchSize = 100,
    MaxBatchingWindow = Duration.Seconds(5),
    BisectBatchOnError = true,
    RetryAttempts = 3,
    OnFailure = new SqsDestination(deadLetterQueue),
    ParallelizationFactor = 2,
}));

// Grant the producer access
stream.GrantWrite(producerFunction);

Design Considerations

Shard count and partition keys

Each shard handles 1 MB/s writes, 2 MB/s reads. Choose a partition key that distributes evenly across shards. Customer ID works well if you have many customers. A single high-volume customer will hot-spot one shard.

Enhanced fan-out

Standard consumers share the 2 MB/s read throughput per shard. If you have multiple consumers and need dedicated throughput, use enhanced fan-out (each consumer gets its own 2 MB/s pipe). Costs more but eliminates read contention.

Ordering guarantees

Records with the same partition key arrive in order within a shard. If strict ordering across all records matters, you need a single shard (which limits throughput). Usually you only need ordering per-entity (per customer, per device), which partition keys handle naturally.

Further Reading

Looking for hands-on help? View my .NET on AWS services β†’

Building real-time data pipelines?

Drop me a message β€” I typically respond within one business day.