序
本文主要研究一下flink Table Schema的定义
实例
定义字段及类型
.withSchema( new Schema() .field("MyField1", Types.INT) // required: specify the fields of the table (in this order) .field("MyField2", Types.STRING) .field("MyField3", Types.BOOLEAN))
- 通过field定义字段名及字段类型
定义字段属性
.withSchema( new Schema() .field("MyField1", Types.SQL_TIMESTAMP) .proctime() // optional: declares this field as a processing-time attribute .field("MyField2", Types.SQL_TIMESTAMP) .rowtime(...) // optional: declares this field as a event-time attribute .field("MyField3", Types.BOOLEAN) .from("mf3") // optional: original field in the input that is referenced/aliased by this field)
- 通过proctime定义processing-time,通过rowtime定义event-time,通过from定义引用或别名
定义Rowtime属性
// Converts an existing LONG or SQL_TIMESTAMP field in the input into the rowtime attribute..rowtime( new Rowtime() .timestampsFromField("ts_field") // required: original field name in the input)// Converts the assigned timestamps from a DataStream API record into the rowtime attribute// and thus preserves the assigned timestamps from the source.// This requires a source that assigns timestamps (e.g., Kafka 0.10+)..rowtime( new Rowtime() .timestampsFromSource())// Sets a custom timestamp extractor to be used for the rowtime attribute.// The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`..rowtime( new Rowtime() .timestampsFromExtractor(...))
- 通过timestampsFromField、timestampsFromSource、timestampsFromExtractor定义rowtime
定义watermark strategies
// Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum// observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp// are not late..rowtime( new Rowtime() .watermarksPeriodicAscending())// Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.// Emits watermarks which are the maximum observed timestamp minus the specified delay..rowtime( new Rowtime() .watermarksPeriodicBounded(2000) // delay in milliseconds)// Sets a built-in watermark strategy which indicates the watermarks should be preserved from the// underlying DataStream API and thus preserves the assigned watermarks from the source..rowtime( new Rowtime() .watermarksFromSource())
- 通过watermarksPeriodicAscending、watermarksPeriodicBounded、watermarksFromSource定义watermark strategies
StreamTableEnvironment.connect
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/StreamTableEnvironment.scala
abstract class StreamTableEnvironment( private[flink] val execEnv: StreamExecutionEnvironment, config: TableConfig) extends TableEnvironment(config) { //...... def connect(connectorDescriptor: ConnectorDescriptor): StreamTableDescriptor = { new StreamTableDescriptor(this, connectorDescriptor) } //......}
- StreamTableEnvironment的connect方法创建StreamTableDescriptor
StreamTableDescriptor
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
class StreamTableDescriptor( tableEnv: StreamTableEnvironment, connectorDescriptor: ConnectorDescriptor) extends ConnectTableDescriptor[StreamTableDescriptor]( tableEnv, connectorDescriptor) with StreamableDescriptor[StreamTableDescriptor] { //......}
- StreamTableDescriptor继承了ConnectTableDescriptor
ConnectTableDescriptor
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala
abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]]( private val tableEnv: TableEnvironment, private val connectorDescriptor: ConnectorDescriptor) extends TableDescriptor with SchematicDescriptor[D] with RegistrableDescriptor { this: D => private var formatDescriptor: Option[FormatDescriptor] = None private var schemaDescriptor: Option[Schema] = None /** * Searches for the specified table source, configures it accordingly, and registers it as * a table under the given name. * * @param name table name to be registered in the table environment */ override def registerTableSource(name: String): Unit = { val tableSource = TableFactoryUtil.findAndCreateTableSource(tableEnv, this) tableEnv.registerTableSource(name, tableSource) } /** * Searches for the specified table sink, configures it accordingly, and registers it as * a table under the given name. * * @param name table name to be registered in the table environment */ override def registerTableSink(name: String): Unit = { val tableSink = TableFactoryUtil.findAndCreateTableSink(tableEnv, this) tableEnv.registerTableSink(name, tableSink) } /** * Searches for the specified table source and sink, configures them accordingly, and registers * them as a table under the given name. * * @param name table name to be registered in the table environment */ override def registerTableSourceAndSink(name: String): Unit = { registerTableSource(name) registerTableSink(name) } /** * Specifies the format that defines how to read data from a connector. */ override def withFormat(format: FormatDescriptor): D = { formatDescriptor = Some(format) this } /** * Specifies the resulting table schema. */ override def withSchema(schema: Schema): D = { schemaDescriptor = Some(schema) this } // ---------------------------------------------------------------------------------------------- /** * Converts this descriptor into a set of properties. */ override def toProperties: util.Map[String, String] = { val properties = new DescriptorProperties() // this performs only basic validation // more validation can only happen within a factory if (connectorDescriptor.isFormatNeeded && formatDescriptor.isEmpty) { throw new ValidationException( s"The connector '$connectorDescriptor' requires a format description.") } else if (!connectorDescriptor.isFormatNeeded && formatDescriptor.isDefined) { throw new ValidationException( s"The connector '$connectorDescriptor' does not require a format description " + s"but '${formatDescriptor.get}' found.") } properties.putProperties(connectorDescriptor.toProperties) formatDescriptor.foreach(d => properties.putProperties(d.toProperties)) schemaDescriptor.foreach(d => properties.putProperties(d.toProperties)) properties.asMap() }}
- ConnectTableDescriptor提供了withSchema方法,返回Schema
Schema
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/Schema.scala
class Schema extends Descriptor { // maps a field name to a list of properties that describe type, origin, and the time attribute private val tableSchema = mutable.LinkedHashMap[String, mutable.LinkedHashMap[String, String]]() private var lastField: Option[String] = None def schema(schema: TableSchema): Schema = { tableSchema.clear() lastField = None schema.getFieldNames.zip(schema.getFieldTypes).foreach { case (n, t) => field(n, t) } this } def field(fieldName: String, fieldType: TypeInformation[_]): Schema = { field(fieldName, TypeStringUtils.writeTypeInfo(fieldType)) this } def field(fieldName: String, fieldType: String): Schema = { if (tableSchema.contains(fieldName)) { throw new ValidationException(s"Duplicate field name $fieldName.") } val fieldProperties = mutable.LinkedHashMap[String, String]() fieldProperties += (SCHEMA_TYPE -> fieldType) tableSchema += (fieldName -> fieldProperties) lastField = Some(fieldName) this } def from(originFieldName: String): Schema = { lastField match { case None => throw new ValidationException("No field previously defined. Use field() before.") case Some(f) => tableSchema(f) += (SCHEMA_FROM -> originFieldName) lastField = None } this } def proctime(): Schema = { lastField match { case None => throw new ValidationException("No field defined previously. Use field() before.") case Some(f) => tableSchema(f) += (SCHEMA_PROCTIME -> "true") lastField = None } this } def rowtime(rowtime: Rowtime): Schema = { lastField match { case None => throw new ValidationException("No field defined previously. Use field() before.") case Some(f) => tableSchema(f) ++= rowtime.toProperties.asScala lastField = None } this } final override def toProperties: util.Map[String, String] = { val properties = new DescriptorProperties() properties.putIndexedVariableProperties( SCHEMA, tableSchema.toSeq.map { case (name, props) => (Map(SCHEMA_NAME -> name) ++ props).asJava }.asJava ) properties.asMap() }}
- Schem提供了field、from、proctime、rowtime方法用于定义Schema的相关属性
Rowtime
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/Rowtime.scala
class Rowtime extends Descriptor { private var timestampExtractor: Option[TimestampExtractor] = None private var watermarkStrategy: Option[WatermarkStrategy] = None def timestampsFromField(fieldName: String): Rowtime = { timestampExtractor = Some(new ExistingField(fieldName)) this } def timestampsFromSource(): Rowtime = { timestampExtractor = Some(new StreamRecordTimestamp) this } def timestampsFromExtractor(extractor: TimestampExtractor): Rowtime = { timestampExtractor = Some(extractor) this } def watermarksPeriodicAscending(): Rowtime = { watermarkStrategy = Some(new AscendingTimestamps) this } def watermarksPeriodicBounded(delay: Long): Rowtime = { watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay)) this } def watermarksFromSource(): Rowtime = { watermarkStrategy = Some(PreserveWatermarks.INSTANCE) this } def watermarksFromStrategy(strategy: WatermarkStrategy): Rowtime = { watermarkStrategy = Some(strategy) this } final override def toProperties: java.util.Map[String, String] = { val properties = new DescriptorProperties() timestampExtractor.foreach(normalizeTimestampExtractor(_) .foreach(e => properties.putString(e._1, e._2))) watermarkStrategy.foreach(normalizeWatermarkStrategy(_) .foreach(e => properties.putString(e._1, e._2))) properties.asMap() }}
- Rowtime提供了timestampsFromField、timestampsFromSource、timestampsFromExtractor方法用于定义timestamps;提供了watermarksPeriodicAscending、watermarksPeriodicBounded、watermarksFromSource、watermarksFromStrategy方法用于定义watermark strategies
小结
- StreamTableEnvironment的connect方法创建StreamTableDescriptor;StreamTableDescriptor继承了ConnectTableDescriptor;ConnectTableDescriptor提供了withSchema方法,返回Schema
- Schem提供了field、from、proctime、rowtime方法用于定义Schema的相关属性;通过proctime定义processing-time,通过rowtime定义event-time,通过from定义引用或别名
- Rowtime提供了timestampsFromField、timestampsFromSource、timestampsFromExtractor方法用于定义timestamps;提供了watermarksPeriodicAscending、watermarksPeriodicBounded、watermarksFromSource、watermarksFromStrategy方法用于定义watermark strategies