如何进行flink中的kafka源码分析
今天就跟大家聊聊有关如何进行flink中的kafka源码分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
瑞金ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为创新互联的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:028-86922220(备注:SSL证书合作)期待与您的合作!
最近一直在弄flink sql相关的东西,第一阶段的目标是从解决kafka的消费和写入的问题。不过也有些同学并不是很了解,今天我们来详细分析一下包的继承层次。




flink源码如下:
public class KafkaTableSourceFactory implements StreamTableSourceFactory{ private ConcurrentHashMap
kafkaTableSources = new ConcurrentHashMap<>(); @Override public Map requiredContext() { Map context = new HashMap<>(); context.put(CONNECTOR_TYPE(), KafkaConnectorDescriptor.CONNECTOR_TYPE); context.put(CONNECTOR_PROPERTY_VERSION(),String.valueOf(KafkaConnectorDescriptor.CONNECTOR_PROPERTY_VERSION)); return context; } @Override public List supportedProperties() { List properties = new ArrayList<>(); properties.add(KafkaConnectorDescriptor.DATABASE_KEY); properties.add(KafkaConnectorDescriptor.TABLE_KEY); return properties; } @Override public StreamTableSource createStreamTableSource(Map
properties) { //避免频繁的触发 是否需要加缓存 KafkaTableSource kafkaTableSource; String dataBase = properties.get(KafkaConnectorDescriptor.DATABASE_KEY); String table = properties.get(KafkaConnectorDescriptor.TABLE_KEY); if (!kafkaTableSources.containsKey(dataBase + table)) { Kafka08UDMPBTableSource.Builder builder = new Kafka08UDMPBTableSource.Builder(); kafkaTableSource = builder .cluster(dataBase) .subject(table) .build(); kafkaTableSources.put(dataBase + table,kafkaTableSource); } else { kafkaTableSource = kafkaTableSources.get(dataBase + table); } return kafkaTableSource; } }
class Kafka08PBTableSource protected(topic: String,
properties: Properties,
schema: TableSchema,
typeInformation: TypeInformation[Row],
paramMap: util.LinkedHashMap[String, AnyRef],
entryClass: String)
extends KafkaTableSource(schema, topic, properties, new PBRowDeserializationSchema(typeInformation, paramMap,entryClass)) {
override def createKafkaConsumer(topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row]): FlinkKafkaConsumerBase[Row] = {
this.setStartupMode(StartupMode.EARLIEST)
new FlinkKafkaConsumer08(topic, deserializationSchema, properties).setStartFromEarliest()
}
}下面用户自定义的kafka的sink类:
class Kafka08UDMPBTableSink (topic: String,
properties: Properties,
partitioner: Optional[FlinkKafkaPartitioner[Row]],
paramMap: util.LinkedHashMap[String, AnyRef],
serializationSchema: SerializationSchema[Row],
fieldNames: Array[String],
fieldTypes: Array[TypeInformation[_]]
) extends KafkaTableSink(topic, properties, partitioner.orElse(new FlinkFixedPartitioner[Row])) {
override def createKafkaProducer(topic: String, properties: Properties, serializationSchema: SerializationSchema[Row], partitioner: Optional[FlinkKafkaPartitioner[Row]]): SinkFunction[Row]={
new FlinkKafkaProducer08[Row](topic, serializationSchema, properties, partitioner.orElse(new FlinkFixedPartitioner[Row]))
}
override def createSerializationSchema(rowSchema: RowTypeInfo) = serializationSchema
override def createCopy = new Kafka08UDMPBTableSink(topic, properties, this.partitioner, paramMap, serializationSchema, fieldNames, fieldTypes)
override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): KafkaTableSink = {
super.configure(this.fieldNames, this.fieldTypes)
}
override def getFieldNames: Array[String]=this.fieldNames
/** Returns the types of the table fields. */
override def getFieldTypes: Array[TypeInformation[_]]=this.fieldTypes
override def emitDataStream(dataStream: DataStream[Row]): Unit = {
val kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner)
dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass, fieldNames))
}
}public class TrackRowDeserializationSchema implements SerializationSchema, DeserializationSchema
{ private static final long serialVersionUID = -2885556750743978636L; /** Type information describing the input type. */ private TypeInformation
typeInfo = null; private LinkedHashMap paraMap; private String inSchema; private String outSchema; private String inClass; private String outClass; }
public class TrackRowFormatFactory extends TableFormatFactoryBaseimplements SerializationSchemaFactory
, DeserializationSchemaFactory
{ public TrackRowFormatFactory() { super(TrackValidator.FORMAT_TYPE_VALUE, 1, false); } public TrackRowFormatFactory(String type, int version, boolean supportsSchemaDerivation) { super(type, version, supportsSchemaDerivation); } @Override protected List
supportedFormatProperties() { final List properties = new ArrayList<>(); properties.add(TrackValidator.FORMAT_IN_SCHEMA); properties.add(TrackValidator.FORMAT_IN_CLASS); properties.add(TrackValidator.FORMAT_OUT_CLASS); properties.add(TrackValidator.FORMAT_OUT_SCHEMA); properties.add(TrackValidator.FORMAT_TYPE_INFORMATION); properties.add(TrackValidator.FORMAT_TYPE_VALUE); return properties; } }
看完上述内容,你们对如何进行flink中的kafka源码分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注创新互联行业资讯频道,感谢大家的支持。
标题名称:如何进行flink中的kafka源码分析
标题网址:http://www.jxjierui.cn/article/jjossh.html


咨询
建站咨询
