Connectors
Connectors are the bridge between Flink and external systems. From Kafka sources with exactly-once guarantees to async I/O for enrichment lookups, connectors determine how data flows in and out of your streaming pipeline.
Table of Contents
Kafka Connector
Kafka is the most common source and sink for Flink jobs. The connector supports exactly-once semantics (via transactions for sinks), per-partition watermarks, and flexible offset management.
// Kafka Source (Flink 1.14+ unified source API) KafkaSource<Event> source = KafkaSource.<Event>builder() .setBootstrapServers("kafka-1:9092,kafka-2:9092") .setTopics("user-events", "order-events") // multiple topics .setGroupId("flink-analytics-v2") .setStartingOffsets(OffsetsInitializer.committedOffsets( OffsetResetStrategy.EARLIEST)) .setDeserializer(KafkaRecordDeserializationSchema.of( new EventDeserializer())) .setProperty("partition.discovery.interval.ms", "30000") .build(); // With watermark strategy DataStream<Event> stream = env.fromSource( source, WatermarkStrategy.<Event>forBoundedOutOfOrderness( Duration.ofSeconds(5)) .withTimestampAssigner((event, ts) -> event.getTimestamp()) .withIdleness(Duration.ofMinutes(2)), "Kafka Source"); // Kafka Sink with exactly-once KafkaSink<Result> sink = KafkaSink.<Result>builder() .setBootstrapServers("kafka-1:9092") .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("results") .setValueSerializationSchema(new ResultSerializer()) .build()) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix("analytics-job") .setProperty("transaction.timeout.ms", "600000") .build(); stream.sinkTo(sink);
Kafka Connector Features
- ✅Exactly-once source: checkpoint stores Kafka offsets, rewinds on failure
- ✅Exactly-once sink: two-phase commit with Kafka transactions
- ✅Per-partition watermarks: one slow partition doesn't block others
- ✅Dynamic partition discovery: detects new partitions at runtime
- ✅Multiple topic subscription: regex patterns or explicit list
- ✅Offset initialization: earliest, latest, committed, timestamp-based
Filesystem Connector
The filesystem connector writes streaming data to files (S3, HDFS, local) with configurable rolling policies. It supports exactly-once via a two-phase commit protocol — files are written to a temporary location and moved to final location on checkpoint completion.
// Streaming file sink with rolling policy FileSink<Event> fileSink = FileSink .forRowFormat( new Path("s3://data-lake/events/"), new SimpleStringEncoder<Event>("UTF-8")) .withRollingPolicy(DefaultRollingPolicy.builder() .withRolloverInterval(Duration.ofMinutes(15)) // new file every 15 min .withInactivityInterval(Duration.ofMinutes(5)) // or after 5 min idle .withMaxPartSize(MemorySize.ofMebiBytes(256)) // or at 256 MB .build()) .withBucketAssigner(new DateTimeBucketAssigner<>( "'year='yyyy'/month='MM'/day='dd'/hour='HH")) .build(); // Result: s3://data-lake/events/year=2024/month=03/day=15/hour=10/part-0-0 // Bulk format (Parquet) for analytics FileSink<GenericRecord> parquetSink = FileSink .forBulkFormat( new Path("s3://data-lake/events-parquet/"), ParquetAvroWriters.forGenericRecord(schema)) .withBucketAssigner(new DateTimeBucketAssigner<>()) .build(); // File lifecycle: // 1. In-progress: .part-0-0.inprogress (being written) // 2. Pending: .part-0-0.pending (checkpoint taken, awaiting commit) // 3. Finished: part-0-0 (checkpoint committed, final)
Exactly-Once File Writes
The filesystem sink achieves exactly-once by writing to temporary files and only renaming them to final names on checkpoint commit. If a failure occurs, pending files are discarded and rewritten. This requires the filesystem to support atomic rename (S3 with S3A committer, HDFS natively).
JDBC Connector
The JDBC connector reads from and writes to relational databases. For sinks, it supports upsert mode (INSERT ON CONFLICT UPDATE) which provides idempotent writes — a simpler path to exactly-once than transactions.
// JDBC Sink: upsert mode (idempotent exactly-once) SinkFunction<UserStats> jdbcSink = JdbcSink.sink( "INSERT INTO user_stats (user_id, event_count, last_seen) " + "VALUES (?, ?, ?) " + "ON CONFLICT (user_id) DO UPDATE SET " + "event_count = EXCLUDED.event_count, " + "last_seen = EXCLUDED.last_seen", (statement, stats) -> { statement.setString(1, stats.getUserId()); statement.setLong(2, stats.getEventCount()); statement.setTimestamp(3, stats.getLastSeen()); }, JdbcExecutionOptions.builder() .withBatchSize(1000) // batch 1000 rows .withBatchIntervalMs(200) // or flush every 200ms .withMaxRetries(3) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:postgresql://host:5432/db") .withDriverName("org.postgresql.Driver") .withUsername("flink") .withPassword("...") .build() ); // JDBC Source: read lookup table for enrichment // (Better to use Async I/O for streaming enrichment) DataStream<Row> lookupData = env.createInput( JdbcInputFormat.buildJdbcInputFormat() .setDrivername("org.postgresql.Driver") .setDBUrl("jdbc:postgresql://host:5432/db") .setQuery("SELECT id, name, category FROM products") .setRowTypeInfo(rowTypeInfo) .finish());
Upsert for Exactly-Once
JDBC upsert (INSERT ON CONFLICT UPDATE) provides idempotent writes — writing the same record twice produces the same result. This means even if Flink reprocesses events after a failure, the database ends up in the correct state. Simpler than two-phase commit and works with any database that supports upsert.
Elasticsearch Connector
The Elasticsearch connector provides a sink for indexing streaming data. It uses bulk requests for efficiency and supports upsert mode with document IDs for idempotent writes.
// Elasticsearch Sink (Flink 1.16+) ElasticsearchSink<Event> esSink = new ElasticsearchSinkBuilder<Event>() .setHosts(new HttpHost("es-host", 9200, "https")) .setEmitter((event, context, indexer) -> { indexer.add(new IndexRequest("events") .id(event.getId()) // document ID for upsert/idempotency .source(Map.of( "user_id", event.getUserId(), "action", event.getAction(), "timestamp", event.getTimestamp(), "metadata", event.getMetadata() ))); }) .setBulkFlushMaxActions(1000) // flush every 1000 docs .setBulkFlushInterval(5000) // or every 5 seconds .setBulkFlushMaxSizeMb(5) // or at 5 MB .setBulkFlushBackoffStrategy( FlushBackoffType.EXPONENTIAL, 5, 1000) .build(); stream.sinkTo(esSink); // With document ID → idempotent (exactly-once via upsert) // Without document ID → at-least-once (duplicates possible)
Kinesis & Cloud Connectors
Flink provides connectors for major cloud streaming services. These follow similar patterns to the Kafka connector but with cloud-specific features like enhanced fan-out and auto-scaling.
| Connector | Source | Sink | Exactly-Once | Notes |
|---|---|---|---|---|
| AWS Kinesis | ✅ | ✅ | Source only (checkpoint offsets) | Enhanced fan-out, shard discovery |
| AWS Kinesis Firehose | ❌ | ✅ | At-least-once | Direct to S3/Redshift/ES |
| Google Pub/Sub | ✅ | ✅ | Source only | Subscription-based |
| Azure Event Hubs | ✅ | ✅ | Source only | Kafka-compatible protocol |
// AWS Kinesis Source KinesisStreamsSource<Event> kinesisSource = KinesisStreamsSource .<Event>builder() .setStreamArn("arn:aws:kinesis:us-east-1:123456:stream/events") .setDeserializationSchema(new EventDeserializer()) .setStartingPosition(KinesisPosition.fromTimestamp(startTime)) .build(); // With enhanced fan-out (dedicated throughput per consumer) Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "EFO"); consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "flink-analytics");
Async I/O
Async I/O enables non-blocking external calls (database lookups, API calls, cache reads) without stalling the entire operator. Instead of waiting synchronously for each response, multiple requests are in-flight simultaneously, dramatically improving throughput.
The Restaurant Waiter
Synchronous I/O is like a waiter who takes one order, walks to the kitchen, waits for the food, delivers it, then takes the next order. Async I/O is like a waiter who takes multiple orders, sends them all to the kitchen, and delivers food as it comes out — handling many tables concurrently without idle waiting.
// Async I/O for database enrichment DataStream<EnrichedEvent> enriched = AsyncDataStream .unorderedWait( events, new AsyncDatabaseLookup(), 30, TimeUnit.SECONDS, // timeout per request 100 // max concurrent requests ); // Async function implementation public class AsyncDatabaseLookup extends RichAsyncFunction<Event, EnrichedEvent> { private transient AsyncHttpClient client; @Override public void open(Configuration params) { client = HttpAsyncClients.createDefault(); client.start(); } @Override public void asyncInvoke(Event event, ResultFuture<EnrichedEvent> resultFuture) { // Non-blocking call CompletableFuture<UserProfile> future = lookupUser(event.getUserId()); future.thenAccept(profile -> { resultFuture.complete(Collections.singleton( new EnrichedEvent(event, profile))); }).exceptionally(ex -> { resultFuture.complete(Collections.singleton( new EnrichedEvent(event, null))); // fallback return null; }); } @Override public void timeout(Event event, ResultFuture<EnrichedEvent> resultFuture) { // Handle timeout — emit with null enrichment resultFuture.complete(Collections.singleton( new EnrichedEvent(event, null))); } }
| Mode | Ordering | Throughput | Use Case |
|---|---|---|---|
| orderedWait | Preserves input order | Lower (waits for slow requests) | When output order must match input |
| unorderedWait | Results emitted as they complete | Higher (no head-of-line blocking) | When order doesn't matter (most cases) |
Prefer Unordered
Use unorderedWait unless you specifically need output order to match input order. Ordered mode suffers from head-of-line blocking — one slow request holds back all subsequent results. Unordered mode emits results as they complete, maximizing throughput.
Custom Sources & Sinks
When no built-in connector exists for your system, you can build custom sources and sinks. Flink provides the Source and Sink interfaces (FLIP-27 and FLIP-143) for building production-grade connectors with checkpointing support.
// Modern Source API (FLIP-27) — split-based, checkpointable // Components: // 1. Source — entry point, creates enumerator and reader // 2. SplitEnumerator — discovers and assigns splits (partitions) // 3. SourceReader — reads data from assigned splits // Simple custom sink (SinkFunction — legacy but simpler) public class RedisSink extends RichSinkFunction<Event> { private transient JedisPool pool; @Override public void open(Configuration params) { pool = new JedisPool("redis-host", 6379); } @Override public void invoke(Event event, Context ctx) { try (Jedis jedis = pool.getResource()) { jedis.hset("events:" + event.getUserId(), event.getId(), event.toJson()); jedis.expire("events:" + event.getUserId(), 86400); } } @Override public void close() { if (pool != null) pool.close(); } } // Usage stream.addSink(new RedisSink());
Custom Connector Checklist
- ✅Implement CheckpointedFunction for exactly-once source state
- ✅Handle backpressure — respect Flink's flow control signals
- ✅Make connections serializable or transient (recreate in open())
- ✅Implement proper cleanup in close() — release connections, flush buffers
- ✅Add metrics for monitoring (records processed, latency, errors)
- ✅Test failure recovery — verify state is correctly restored
Interview Questions
Q:How does the Kafka connector achieve exactly-once for both source and sink?
A: Source: Kafka offsets are stored in Flink's checkpoint (not committed to Kafka's __consumer_offsets). On failure, Flink restores offsets from checkpoint and replays from that position — no events lost or duplicated internally. Sink: uses Kafka transactions (two-phase commit). During checkpoint, sink flushes records and pre-commits the transaction. On checkpoint completion, transaction is committed. On failure, uncommitted transaction is aborted. Consumers using read_committed isolation never see uncommitted data.
Q:What is Async I/O and why is it important for streaming enrichment?
A: Async I/O enables non-blocking external calls (database lookups, API calls) without stalling the operator. Without it, each event waits synchronously for the external response — throughput limited to 1/latency. With async I/O, multiple requests are in-flight simultaneously (e.g., 100 concurrent). Throughput increases proportionally. Two modes: orderedWait (preserves order, lower throughput) and unorderedWait (results emitted as completed, higher throughput). Always prefer unordered unless output order is required.
Q:How does the filesystem sink achieve exactly-once?
A: Three-phase file lifecycle: (1) In-progress — file being actively written. (2) Pending — checkpoint taken, file closed but not finalized. (3) Finished — checkpoint committed, file renamed to final name. On failure: pending files are discarded (never made visible), in-progress files are truncated to last checkpoint position. The key requirement: the filesystem must support atomic rename (HDFS natively, S3 via S3A committer). This ensures consumers only see complete, committed files.
Q:When would you build a custom connector vs using Async I/O?
A: Custom connector (Source/Sink): when you need a proper streaming source (continuous reading with checkpointing) or a sink with delivery guarantees (exactly-once via 2PC or idempotent writes). Examples: reading from a proprietary message queue, writing to a custom storage system. Async I/O: for enrichment lookups during processing — you already have a stream and need to add data from an external system. It's not a source or sink, it's a transformation step. Example: looking up user profiles from Redis/DynamoDB to enrich events.
Common Mistakes
Synchronous external calls in map/process functions
Making blocking HTTP/database calls inside a map() or processElement(). Each call blocks the entire operator thread, reducing throughput to 1/latency. With 10ms latency, max throughput is 100 events/sec per parallel instance.
✅Use AsyncDataStream.unorderedWait() for external lookups. This allows hundreds of concurrent requests per operator instance. For very high throughput, consider pre-loading data into Flink state (broadcast state pattern) instead of per-event lookups.
Not setting Kafka transaction timeout correctly
Using exactly-once Kafka sink with default transaction.timeout.ms (60s). Checkpoints that take longer than 60s cause Kafka to abort the transaction, losing all pre-committed data.
✅Set transaction.timeout.ms > checkpoint_interval + max_checkpoint_duration. Also ensure the Kafka broker's transaction.max.timeout.ms is at least as high. Monitor checkpoint duration and set timeout with generous buffer.
Not batching JDBC writes
Writing each event individually to the database. With 10,000 events/sec, that's 10,000 INSERT statements per second — overwhelming the database with connection overhead.
✅Configure JdbcExecutionOptions with batch size (e.g., 1000) and flush interval (e.g., 200ms). This batches multiple events into single JDBC batch operations, reducing database load by 100-1000x.
Not handling connector failures gracefully
External system goes down (database, Elasticsearch) and the Flink job crashes repeatedly, burning through restart attempts until the job fails permanently.
✅Implement retry logic with exponential backoff in custom sinks. For built-in connectors, configure retry options. Set tolerableCheckpointFailureNumber > 0 so transient failures don't kill the job. Use circuit breaker patterns for non-critical sinks.