本文翻译自Samza官网文档Container章节,主要包括Samza容器类、流、序列化、状态管理等内容。

Samza Container

SamzaContainer,负责管理一个或多个StreamTask实例的启动、执行以及关闭。每个SamzaContainer会单独运行在一个JAVA虚拟机之上,也就是说一个SamazaContainer即是一个JAVA进程!一个Samza任务可能由多个SamzaContainer组成,并且这些SamzaContainer可能运行在多个机器节点之上。
当SamzaContainer启动时,它执行以下操作:

  1. 获取该SamzaContainer所处理输入流的每个分区上的最近一次checkpointed(检查点)的位置偏移量
  2. 为每个输入流分区创建一个”read”线程
  3. 开启监控指标报告
  4. 启动checkpointed定时器,定期对StreamTask所处理的输入流的位置偏移量进行记录
  5. 如果StreamTask同时实现了WindowableTask接口,则会启动时间窗口定时器以便触发时间窗口函数(window method)
  6. 对输入流的每个分区创建和初始化一个StreamTask,该过程只会进行一次。
  7. 启动循环读取事件将”read”线程从输入流中读取的消息转发到你的StreamTask中
  8. 以上的所有操作会通知给生命周期监听器

下面通过一个具体的StreamTask实例来综合分析以上的所有执行过程。

Tasks and Partitions

当container启动时,他会实例化你所编写的StreamTask实现类.如果该task同时实现了InitableTask接口,那么SamzaContainer将会调用该接口的init()方法:

1
2
3
4
/** Implement this if you want a callback when your task starts up. */
public interface InitableTask {
void init(Config config, TaskContext context);
}

默认情况下,创建多少个StreamTask实例,取决于samza job所处理的输入流的分区个数。假如你的Samza Job处理的输入流有十个分区,那么samza将会相应的创建10个StreamTask实例:每个分区对应一个StreamTask。每个task处理其对应的的分区上的全部数据。
tasks-and-partitions
分区的个数由samza所接收处理的消息源来指定。比如,Kafka消息系统,可以在创建队列时通过命令行指定该队列的分区个数,也可以在Kafka的server.properties文件中进行配置。
如果一个Samza Job有多个输入流,那么该Samza Job中StreamTask实例的个数为所有输入流中某个输入流的最大分区个数。比如,一个Samza Job同时从队列PageViewEvent(12个分区)以及ServiceMetricEvent(14个分区)接收消息,那么该Samza Job将会创建14个StreamTask实例(task编号从0到13)。那么编号为12和13的task将只会从ServiceMetricEvent接收消息,因为对于PageViewEvent来说并没有与这两个task对应的输入分区。

基于上面这种默认的StreamTask实例分配方式,Samza便可以对输入流中的分区Key来进行group-by操作。其他对于输入流进行group操作的方式,通过实现SystemStreamPartitionGrouper接口以及相应的SystemStreamPartitionGrouperFactory工厂接口,最后通过配置job.systemstreampartition.grouper.factory属性来使其在任务中生效.

Samza provides the above-discussed per-partition grouper as well as the GroupBySystemStreamPartitionGrouper, which provides a separate task class instance for every input stream partition, effectively grouping by the input stream itself. This provides maximum scalability in terms of how many containers can be used to process those input streams and is appropriate for very high volume jobs that need no grouping of the input streams.

考虑到前面示例中PageViewEvent有12个分区,ServiceMetricEvent有14个分区,那么GroupBySystemStreamPartitionGrouper将会创建12+14=26个StreamTask实例,which would then be distributed across the number of containers configured,后文将单独叙述.

注意:一旦一个Samza任务使用了特定的SystemStreamPartitionGrouper同时该任务启用了中间状态存储和checkpointing,那么如果以后使用了新的grouping策略,以前存储的中间状态和checkpointing将不能再被正确使用!

Containers and resource allocation

虽然StreamTask实例个数是固定的(由输入流分区个数决定),但是你可以配置你的Samza Job可以包含多少个containers。如果你使用了Yarn,那么Samza Container个数将取决于机器节点的CPU以及内存的分配情况。
如果你的Samza系统处理的数据量较少,那么一个Samza Container便足以满足需要。在这种情况下,samza仍然会为每个输入分区创建一个StreamTask实例,但是这些task实例将运行于同个Samaza Container中。极端情况下,你也可以为每个输入分区分别创建一个Samza Container,这样,samza会为每个Samza Container分配一个StreamTask实例。

每个Samza Container被设计使用一个CPU Core,因此它使用的是single-threaded event loop.因此不推荐在Samza Container中再去创建新的线程。如果你想拥有更多的parallelism(并行数),那么你可以通过配置更多的Samza Container来实现。

Samza Job中的每个state都只属于某个StreamTask实例,而并不属于某个Samza Container。这点是Samza具有可扩展性的关键设计:如果你需要增加或缩减samza job所使用的资源,那么你可以简单的通过增加或减少Samza Container的个数来实现,但是StreamTask的实例个数并不会发生改变。无论是增加或者减少资源使用,StreamTask以及与其关联的state的映射关系并不会发生改变。当一个StreamTask从一个Samza Container移动到另外一个Samza Container时,与之对应的存储state也会随之移动。这样便可以保证samza job处理的语义保持不变,即便你改变了任务的并行度。

Joining multiple input streams

Samza提供了简单但是功能强大的机制来方便你同时处理多个输入流的情况:每个StreamTask负责处理输入流的单个分区。比如,你需要同时处理A、B两个输入流,而这两个输入流均有四个分区。那么,Samza将会创建四个StreamTask来处理这A、B两个输入,task和分区的对应关系如下所示:
multiple-input-streams
因此,如果你想要在同个StreamTask中处理来自两个不同输入流的数据,那么你需要确保它们会将消息发送到相同的分区号上。当然你可以通过使用相同的分区Key来实现消息发送.关于流聚合的更多细节将会在State Management章节中详细讲述。

注意:Samza目前假设所有的输入流的分区数目永远不会发生变化。因此,Samza并不支持分区拆分或者重新分区。如果某个输入流有N个分区,那么Samza将一直认为该流只有N个分区。如果你想要对某个输入流进行重新分区,你可以自己编写一个job,从输入流中读取消息,然后再将该消息写入一个新的输入流中,此时你可以对该新的输入流根据你想要的分区方式进行分区。比如,前面的示例中,你可以从PageViewEvent topic读取消息,转而写入PageViewEventRepartition topic中。

Streams

samza container使用SystemConsumer以及SystemProducer两个接口来进行消息读写。因此,通过实现这个两个接口你可以集成任何消息队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public interface SystemConsumer {
void start();
void stop();
void register(
SystemStreamPartition systemStreamPartition,
String lastReadOffset);
List<IncomingMessageEnvelope> poll(
Map<SystemStreamPartition, Integer> systemStreamPartitions,
long timeout)
throws InterruptedException;
}
public class IncomingMessageEnvelope {
public Object getMessage() { ... }
public Object getKey() { ... }
public SystemStreamPartition getSystemStreamPartition() { ... }
}
public interface SystemProducer {
void start();
void stop();
void register(String source);
void send(String source, OutgoingMessageEnvelope envelope);
void flush(String source);
}
public class OutgoingMessageEnvelope {
...
public Object getKey() { ... }
public Object getMessage() { ... }
}

Samza本身便提供了(KafkaSystemConsumer和KafkaSystemProducer)来集成Kakfa消息系统。任何消息系统只要实现了Samza所需的语义,便可以被集成进来。见javadoc

SystemConsumers和SystemProducers可以对任何数据类型进行读写。It’s ok if they only support byte arrays — samza具有一个独立分离的serialization layer来对消息对象进行序列化和反序列处理。Samza并没有任何对数据模型或者序列化格式的限制。

可以在samza任务配置文件中配置特定的consumer和producer实现.比如你可以在配置文件中指定消息队列的主机名和端口以及其他一些可能的连接配置。

How streams are processed

如果一个samza job需要处理来自多个输入流的消息,并且这些输入流都有消息需要进行处理,那么默认情况下,这些消息将会通过循环的方式进行处理。假如一个job需要接收来自AdImpressionEvent和AdClickEvent两个输入流的消息,那么StreamTask的process()方法将会调用处理来自AdImpressionEvent的消息,然后再处理AdClickEvent的消息,如此交替进行处理。

假如其中一个输入流没有可用消息被处理了(大部分消息已经被处理了),那么task将会跳过该流继续处理来自其他输入流的消息,但以后会继续监听该流是否有新的可用消息需要处理。

MessageChooser

当一个Samza Container接收了多个来自不同分区的消息时,那么它该如何进行处理呢? 答案是MessageChooser.默认是RoundRobinChooser选择处理器,当然你也可以实现自己的消息选择处理器。

想要使用自己的message chooser,你还需要去实现MessageChooserFactory接口,并在配置文件中配置”task.chooser.class”属性:

1
task.chooser.class=com.example.samza.YourMessageChooserFactory

Prioritizing input streams

有时,多个输入流中某些流可能需要优先处理。比如某个samza job同时处理两个输入流:一个来自实时消息系统,另一个来自批处理系统。在这种情况下,实时处理系统的消息流理应比批处理系统的消息优先处理,这样的话以至于有突发的大量实时数据时而不至于会减慢实时处理速度。
Samza提供了一种机制可以通过配置实现流之间的优先处理级别:systems.<system>.streams.<stream>.samza.priority=<number>,例如:

1
2
systems.kafka.streams.my-real-time-stream.samza.priority=2
systems.kafka.streams.my-batch-stream.samza.priority=1

这样来自my-real-time-stream的消息将比来自my-batch-stream的消息优先被处理。只要my-real-time-stream中有需要处理的消息,那么它将被优先处理。直到my-real-time-stream中没有消息需要处理时,此时samza才会继续处理my-batch-stream中的消息。
每个优先级都有其自己的MessageChooser.对于具有相同优先级的两个输入流它也是有效的。如果有多个来自相同优先级别的输入流的消息,那么将由MessageChooser决定先去处理哪个消息。
当然,你也可以只对某些流定义优先级别,那么没有定义优先级的流将自动视为最低优先级,而这些没有定义优先级的输入流将共享同一个MessageChooser。

Bootstrapping

有时,samza需要先完全处理某个输入流的所有消息(偏移量从0开始到最近的消息)才能再继续去处理其他的输入流。这对某些具有先决处理数据条件的job来说,是非常合适的。samza在完全处理这些先决数据之前不会去处理其他数据。samza通过bootstrap streams实现了这种处理语义。
bootstrap stream看起来好像是一个具有高优先级的输入流,不过还是存在某些微妙的差别的。在允许其他流被处理之前,bootstrap stream会一直等待消费者确认该流的数据已经被完全处理。在等待确认之前,在samza job中只有bootstrap stream这一种数据流,即便由于网络原因或其他因素造成bootstrap stream处理变慢,其他的输入流消息也不会被处理。

bootstrap stream与高优先级输入流的另外一个区别是bootstrap stream的特殊处理只是暂时的:当它的消息被完全处理之后(我们称为“caught up”),它的优先级将会变得和其他输入流一致。
下面是配置一个名叫”my-bootstrap-stream”的bootstrap stream:

1
2
3
systems.kafka.streams.my-bootstrap-stream.samza.bootstrap=true
systems.kafka.streams.my-bootstrap-stream.samza.reset.offset=true
systems.kafka.streams.my-bootstrap-stream.samza.offset.default=oldest

The bootstrap=true parameter enables the bootstrap behavior (prioritization over other streams). The combination of reset.offset=true and offset.default=oldest tells Samza to always start reading the stream from the oldest offset, every time a container starts up (rather than starting to read from the most recent checkpoint).
参数bootstrap=true启用bootstrap behavior(优先于其他输入流),组合配置reset.offset=true以及offset.default=oldest是告诉samza每次container启动时,要从oldest offset开始读取消息,而不是从最近一次的checkpoint位置开始读取。

bootstrap stream也可以同时定义多个,在这种情况下,多个bootstrap stream之间的顺序则由他们的优先级决定。

Batching

在某些情况下,你可以通过一次处理某个分区上的多个消息来提高samza的消息处理能力。samza支持这种操作,即批处理。
比如,你想从一个输入分区一次读取100条消息(先不管MessageChooser),那么你可以像这样进行配置:

1
task.consumer.batch.size=100

With this setting, Samza tries to read a message from the most recently used SystemStreamPartition. This behavior continues either until no more messages are available for that SystemStreamPartition, or until the batch size has been reached.

通过上面的设置samza将尝试从最近使用的SystemStreamPartition来读取消息。这个操作将会持续到该SystemStreamPartition没有可读消息,或者消息数目达到单个批次设置的大小。

When that happens, Samza defers to the MessageChooser to determine the next message to process. It then again tries to continue consume from the chosen message’s SystemStreamPartition until the batch size is reached.

Serialization

streampersistent state store中读取或写入的消息最终都需要被序列化成bytes(然后将其通过网络传输或者写入到磁盘)。在多个地方可能会发生序列化以及反序列化:

  1. 在客户端库中,从kafka中读取或向kafka中写入消息需要可插拔的序列化方式
  2. 在StreamTask的实现中,你的process method需要使用字节数组进行输入输出,因此需要能够对消息进行解析和序列化。
  3. 两者之间,Samza提供了一个序列化和反序列化层,简称serdes.

你可以使用任何你觉得需要的方式来实现你的job,samza并没有对数据模型和序列化方式做任何强制限定。但是,较为优雅的方式是使用samza的serde layer。下面的配置展示了该如何使用它:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Define a system called "kafka"
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
# The job is going to consume a topic called "PageViewEvent" from the "kafka" system
task.inputs=kafka.PageViewEvent
# Define a serde called "json" which parses/serializes JSON objects
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
# Define a serde called "integer" which encodes an integer as 4 binary bytes (big-endian)
serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
# For messages in the "PageViewEvent" topic, the key (the ID of the user viewing the page)
# is encoded as a binary integer, and the message is encoded as JSON.
systems.kafka.streams.PageViewEvent.samza.key.serde=integer
systems.kafka.streams.PageViewEvent.samza.msg.serde=json
# Define a key-value store which stores the most recent page view for each user ID.
# Again, the key is an integer user ID, and the value is JSON.
stores.LastPageViewPerUser.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory
stores.LastPageViewPerUser.changelog=kafka.last-page-view-per-user
stores.LastPageViewPerUser.key.serde=integer
stores.LastPageViewPerUser.msg.serde=json

每个serde实现都有自身的工厂类。samza自带了一些序列化器,如UTF-8 strings,binary-encoded integers, JSON等等。当然,你也可以通过实现SerdeFactory接口,来自定义序列化器。

关于序列化器的名字(如前面例子中的”json”和”integer”)仅仅是为了方便在job配置中使用。你可以定义任何你喜欢的名称。然后通过该名称为每个流或state store配置相应的序列化和反序列化器。
如果你没有配置任何的序列化器,samza只是简单的将消息对象在StreamTask实例和system stream之间传输。在这种情况下,你的task需要去发送和接收来自客户端库中使用的任何数据类型。
samza api中所有涉及消息发送和接收的,都被定义成了Object数据类型,这意味着你在使用消息数据之前,需要首先对其进行正确的数据类型转换。这样,仅仅需要一点点代码,samza便可以不用对消息的数据类型做任何特殊限定。

Checkpointing

Samza提供了对流的容错处理:Samza保证消息不会丢失,无论是你的samza job崩溃,还是节点宕机,或者网络错误,以及其他等等影响因素。为了能够实现容错处理,samza需要你的input stream能够满足以下几点:

  • 该输入流具有一个或多个分区。分区之间都是相互独立的,并且每个分区都有备份策略(这样当一个节点宕机时,该输入流还可以继续工作)
  • 每个分区的数据都以一个固定的序列进行存储。每个消息都有一个位置偏移量(offset),用以标明其在消息序列的位置。分区内消息总是按序进行逐个处理的。
  • samza job可以从消息序列的任何位置(offset)启动消费

除了kafka可以满足以上的这些条件,其他的消息队列也能实现上面这些要求。
就像SamzaContainer章节所描述的那样,一个StreamTask实例负责处理一个分区上的消息。每个task都记录了它所处理的当前输入流中的位置偏移(offset):即下个即将读取的消息在输入流分区上的位置。每当一个消息被读取之后,位置偏移值(offset)将会随之移动变化。

当一个Samza Container崩溃失败了,它需要被重新启动(可能发生其他机器节点),并继续接着之前崩溃的位置进行处理。为了实现这个功能,container需要对其管理的StreamTask实例定期进行checkpoint,记录task当前的offset.
checkpoint
当一个Samza Container启动时,它首先会查找最近的checkpoint,然后从该checkpoint的offset继续消费。如果之前的container突然意外崩溃了,最近的checkpoint offset可能会稍稍落后实际的offset(即崩溃前该container已经处理了多个消息,但还未进行checkpoint),这种情况我们无法确切的获知。因此,重新启动的container可能会重复处理一些消息。

前面这种机制被称为至少处理一次:samza确保即使你的container重启了,也不会有消息丢失。然而,这样可能导致某些消息被重复处理,我们计划在未来的版本中解决这个问题。现在你需要注意的一点是:比如,你有个计算页面访问量的job,强制结束container可能导致计算结果比实际要略高。在这种情况下,可以设置checkpoint的频率更高点,这样可能会导致性能略有下降,但是统计计算的结果会更准确些。

为了使checkpoints发挥作用,它们需要被写入到某个地方,以应对当错误发生时能够被正确取出。samza允许你将checkpoints写入到文件系统(通过FileSystemCheckpointManager实现)中,但这并不足以应对当container失败需要在其他机器节点重启container的状况。因此,更为常见的是将checkpoints记录到kafka中,你可以通过如下配置来实现:

1
2
3
4
5
6
7
8
9
10
11
12
# The name of your job determines the name under which checkpoints will be stored
job.name=example-job
# Define a system called "kafka" for consuming and producing to a Kafka cluster
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
# Declare that we want our job's checkpoints to be written to Kafka
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# By default, a checkpoint is written every 60 seconds. You can change this if you like.
task.commit.ms=60000

通过上面的配置,samza将会将checkpoints写入一个单独的名为__samza_checkpoint_<job-name>_<job-id>(上面的例子中topic名称将是__samza_checkpoint_example-job_1)的topic中。每隔一分钟,samza将自动向该topic发送一条包含当前输入流offset的消息。当container启动时,它将在这个topic中查找最近的offset,并加载这个checkpoint。

有时,你可以只对某些输入流进行checkpoint,在这种情况下你需要告诉samza需要对哪些输入流进行忽略checkpoint处理:

1
2
3
4
5
# Ignore any checkpoints for the topic "my-special-topic"
systems.kafka.streams.my-special-topic.samza.reset.offset=true
# Always start consuming "my-special-topic" at the oldest available offset
systems.kafka.streams.my-special-topic.samza.offset.default=oldest

下面这张表是对前面配置参数的详细描述:

参数名称 参数值 说明
systems..streams..samza.reset.offset false(default) 当samza container启动时,从最近一次的checkpoint开始恢复处理
true 忽略checkpoint(当作当前没有checkpoint)
systems..streams..samza.offset.default upcoming (default) 当container启动时并没有checkpoint(或checkpoint被忽略),这样当任务开始时仅仅发出启动处理通知,但并没有处理任何旧的消息
oldest 当container启动时并没有checkpoint(或checkpoint被忽略),跳回到系统中最老的可用的消息位置开始消息处理(一些消息很有可能会被重复处理)

注意前面的示例配置会导致每次container重启时都会从最老的可用offset进行重新处理。这对于你把处理状态存储在内存中,当崩溃时你需要从输入流中重建崩溃前的处理状态这种情况将会非常有用。同时,在这种情况下,你会发现bootstrap streams也会很有用处。

Manipulating Checkpoints Manually

如果你想要通过手动方式一次改变任务消费的offset,比如你想使用新的代码去processed again,那么你可以使用CheckpointTool工具进行检查和管理该任务的checkpoint,该工具类被放置在Samza’s source repository.

为了查询一个任务的最新checkpoint,在使用CheckpointTool时你需要指定该任务的配置文件,以便CheckpointTool能够知道它将要多哪个任务进行处理:

1
samza-example/target/bin/checkpoint-tool.sh --config-path=file:///path/to/job/config.properties

该命令将会以属性文件形式打印出该任务的最新checkpoint。你不仅可以将它输出到指定的文件中,同时也可以对其按照你想要的效果对该输出文件进行编辑。比如,你想要回跳到最老的一次checkpoint,那么你可以将所有的offset都设置为0.然后你可以使用 checkpoint-tool.sh将修改后的文件输入到现在的job中。

1
samza-example/target/bin/checkpoint-tool.sh --config-path=file:///path/to/job/config.properties --new-offsets=file:///path/to/new/offsets.properties

需要注意的是samza只在container重启时才会重新读取checkpoint。为了让你修改的检查点生效,你首先需要停止任务,然后保存修改后的offsets,最后再重新启动该任务。如果你在一个任务运行的过程中更改checkpoint,那么你的更改可能会没有任何作用!

State Management

Samza值得关注的另外一个重要特性是有状态的流式处理。Tasks(StreamTask)可以通过Samza提供的相关API来对某些数据进行存储或者查询。而由某个task所产生的数据会被存储在运行该task的节点之上。当对大量数据进行读写操作时,本地存储相对通过网络操作一个远程数据库而言,其处理性能会有更好的表现。为了容错,samza将这些状态数据(state)在多台机器节点上进行了备份(下面将详细描述).

有些流处理任务并不需要存储状态数据:比如某些时候你只需要对某些消息进行转换,或者基于某些条件对消息进行过滤,类似这种简单的处理任务。这些消息之间是相互独立,互不依赖的,这样当一条消息到来时你只需调用process方法进行处理即可。

然而,一些复杂的流处理任务希望能够维护处理的状态数据:比如输入流连接,消息分组以及聚合操作。拿SQL来说,通常select和where操作都是无状态的,而对于join,group by以及聚合函数(如sum和count)这些操作便需要维护一些状态数据。Samza虽然没有提供高级抽象的SQL操作语言,但其提供了一些低层的语原语,基于这些原语你可以实现类似流聚合或者连接的操作。

Common use cases for stateful processing

首先,我们来看一些有状态的流处理例子,这些例子你可能在某些购物网站的后台看到。在本文的稍后部分,我们再来讨论如何基于samza内建的kv存储来实现这种类似功能。

Windowed aggregation

示例:统计某个用户在一个小时之内浏览某个网页的次数

在这种情况下,你可能需要一些随着消息处理而递增的计数器。这种操作一般被限定在一个特定时间范围之内(如1分钟,1个小时或者1天),这样你便可以随时获知用户活跃变化。这种基于时间窗口的实时计算常用于排名和relevance,获悉”热门话题”以及实时报道和监测!

较为简单的存储中间计算结果的方式是将结果放置在内存中(如在task中创建一个HashMap),然后在调用时间窗口时的某个时间点(比如将要结束的时候)将该中间结果从map写入到数据库中,或者直接将其以流的方式写出。然而,你需要考虑当container失败时存储在内存中数据会丢失这一突发状况。也许你能够在当前时间窗口中通过重新计算所有的消息,来重建container崩溃时的中间计算结果,但是如果container在崩溃前已经运行了相当长的时间(也就是说你的时间窗口较长),那么你重建的时间就会越长.而samza通过state(中间计算结果状态)容错而不是重新计算来加快恢复过程。

Table-table join

示例:对user profiles表和user settings表,通过两表的用户id进行join操作,再将结果数据分发到一个输出流中

也许你在想:将两表中的数据合并然后发送到流中是否有意义?如果你的数据库记录了数据变更日志的话,那么它的确会有用。那么数据库便与数据变更日志之间便有了关联关系(duality between a database and a changelog stream):你可以将每次数据库变更的数据发送到一个流中,如果你从头到尾的处理了整个数据流,那么你就可以重建数据库中的数据。samza中的数据处理便是遵循了这一理念.

如果你有数据库中某些表的变更记录的话,那么你可以实现一个简单的流式处理任务,该任务将把这些表的最新状态数据(last state)存储在本地的kv store中,然后你就可以直接访问这些last state而不用再去查询原始数据库,从而获取更好的性能。现在,只要某个表中的数据发生了变化,你就可以通过关联的key以及变化后的最新数据计算出关联表的最新结果。

下面是几个实际使用的例子:

There are several real-life examples of data normalization which essentially work in this way

  • E-commerce companies like Amazon and EBay need to import feeds of merchandise from merchants, normalize them by product, and present products with all the associated merchants and pricing information.
  • Web search requires building a crawler which creates essentially a table of web page contents and joins on all the relevance attributes such as click-through ratio or pagerank.
  • Social networks take feeds of user-entered text and need to normalize out entities such as companies, schools, and skills.

Each of these use cases is a massively complex data normalization problem that can be thought of as constructing a materialized view over many input tables. Samza can help implement such data processing pipelines robustly.

Stream-table join

Example: Augment a stream of page view events with the user’s ZIP code (perhaps to allow aggregation by zip code in a later stage)
Joining side-information to a real-time feed is a classic use for stream processing. It’s particularly common in advertising, relevance ranking, fraud detection and other domains. Activity events such as page views generally only include a small number of attributes, such as the ID of the viewer and the viewed items, but not detailed attributes of the viewer and the viewed items, such as the ZIP code of the user. If you want to aggregate the stream by attributes of the viewer or the viewed items, you need to join with the users table or the items table respectively.
In data warehouse terminology, you can think of the raw event stream as rows in the central fact table, which needs to be joined with dimension tables so that you can use attributes of the dimensions in your analysis.

Stream-stream join

Example: Join a stream of ad clicks to a stream of ad impressions (to link the information on when the ad was shown to the information on when it was clicked)
A stream join is useful for “nearly aligned” streams, where you expect to receive related events on several input streams, and you want to combine them into a single output event. You cannot rely on the events arriving at the stream processor at the same time, but you can set a maximum period of time over which you allow the events to be spread out.
In order to perform a join between streams, your job needs to buffer events for the time window over which you want to join. For short time windows, you can do this in memory (at the risk of losing events if the machine fails). You can also use Samza’s state store to buffer events, which supports buffering more messages than you can fit in memory

More

也存在一些其他的连接与聚合,但一般本质上都是上面集中模式的组合变化。

Approaches to managing task state

那么,流式处理系统该如何实现这种状态处理呢? 这里,我们先看看其他系统是如何处理的,然后再去看看samza是如何实现的。

In-memory state with checkpointing

在流式处理系统中一个简单的方法是定期将内存中的数据持久化到某个持久存储中。这种方法对于内存中数据量较少的情况下,效果还是不错的。但是,你必须在每个检查点(checkpoint)存储整个任务的状态,而这对于持续增长的任务计算将会是一笔比较大的负担。不幸的是,很多需要连接聚合的案例的中间状态结果的数据量都非常大,很多甚至达到数千兆的字节,这使得中间状态持久化存储变得不切实际。

某些系统会去比较最近一次checkpoint相对上次来说发生了哪些状态变化,这样checkpoint时只需更新变化的部分即可。Storm’s Trident abstraction便是将中间结果状态保存在内存中,然后定期将发生变化的部分发送到远程存储如Cassandra.然而,这种方式只适用于大部分状态数据没有发生变化的情况.在某些情况下,比如流连接,中间结果(state)变化起伏可能会较大.

In some use cases, such as stream joins, it is normal to have a lot of churn in the state, so this technique essentially degrades to making a remote database request for every message (see below).

Using an external store

另外一种常见的方案是将中间状态结果(state)存储在外部数据库或者key-value存储中。通过常规数据库备份便可以实现数据库的容错需求。这样的架构看起来像下面这样:
external-store
samza允许你使用这种风格进行处理—-在你的流处理任务过程中完全可以去查询一个远程数据库或者远程服务。但是,对于有状态的流处理,在处理过程中去查询一个远程数据库可能存在一些问题:

  1. 性能: 通过网络去查询一个数据库,效率低下,代价也较高。对于一个单核的CPU的情况下,kafka每秒能够发送几十万甚至上百万的消息到一个流处理系统。但是如果你每处理一条消息都要去查询数据库的话,那么你的流处理系统的吞吐量很可能会立刻下降2-3个数量级。也许,你可能小心翼翼的通过批量读写缓存而进行优化。但是这样又会回到前面我们讨论的问题上去。
  2. 隔离: 假如你的流处理系统使用的远程数据库同时也服务了其他用户,这样是不安全的。一个可扩展的流处理系统的吞吐量通常较高,并且很容易产生较大的负载量(如消息积压)。如果一不小心,可能会造成数据库拒绝提供服务,从而对其他用户的使用造成影响。
  3. 查询效率: 很多可伸缩的数据库(如key-value数据库)提供的查询接口都很有限,因为”全表扫描”或者大量数据遍历的代价非常高。

    Stream processes are often less latency-sensitive, so richer query capabilities would be more feasible

  4. 数据准确性: 当一个流处理任务失败而需要重新启动时,数据库中状态该如何与流处理的任务保持一致呢?为此,一些流处理框架如storm,会在数据库中同时存储一些元数据信息,但是处理过程要非常小心,否则很容易导致流处理结果不准确。
  5. 再处理: 有时你可能需要重新处理以前的历史数据,比如你更新了流处理代码。然而,由于前面的问题,做外部查询可能会变得不太现实。

Local state in Samza

samza允许task实例以不同于前面所述的方式来管理state(中间结果状态):

  • state可以存储在磁盘上,因此可以比内存存储更多的数据。
  • state与任务处理task在同个机器节点上,因此可以避免通过网络传输来查询数据库而带来的性能问题。
  • 每个流处理任务都有独自的数据存储,从而避免了由于共享数据库而带来的”隔离”问题(这样,即便某个任务的数据查询较慢,但却不会影响其他的处理任务)
  • 不同的查询引擎可以容易的被集成进来,这样便能更加丰富查询功能
  • state会连续备份,这样当错误发生而需要恢复数据时,这样不至于在某个checkpoint会有大量state数据。
    想象下,你有一个远程数据源,将其分成若干分区,分区个数与流处理的任务数相等,分区与task任务一一对应,就像下面这样:
    partition-task

假如一个节点宕机了,那么该节点上的所有task都会结束,同时分区数据也会丢失。为了实现高可靠,所有写入分区数据库的数据都将被以”chanageLog”的形式备份到一个持久存储中(如Kafka)。这样,当一个节点宕机了,我们便可以在其他节点上来重启这些任务,接着通过”chanageLog”来重建以往的数据。

注意:每个task仅有访问它本身数据分区的权限,而不能访问其他任务的数据分区。这很重要:当你想要为你的流处理任务job扩展计算资源时,samza可能需要将一些task从一个机器节点移动到另外一个机器节点上。这样,通过每个task各自独有自己的数据分区(state)的方式,task可以在不影响流处理任务(job)的情况下进行迁移。如果有必要,你完全可以对你的数据流进行重新分区,从而可以让所有的消息通过特殊的数据分区方式路由到某个task(StreamTask)实例.

Log compaction会在后台合并changelog topic中的数据,以确保changelog不会无限制的增长。如果某个数值被多次覆盖,那么日志合并程序仅会保留最新版本的数据,并删除所有老的版本数据。如果你从存储中删除一条记录,那么日志合并程序也会将它移除。通过这种协调方式,changelog的大小将不会超过数据本身的大小。

基于这种架构,samza允许task维护大量的容错state,性能表现几乎可以和基于内存的方式相媲美。但是这种方式存在以下几点限制:

  • 如果你有一些数据(跨分区的)想要在任务之间共享,那么你可能需要一些额外的工作来重新分区和分发数据。每个任务需要自己的数据的备份,因此这种方式可能需要使用更多的处理空间。
  • 当一个container重启时,它可能因为需要重建自身所有的分区状态(state partitions)而占用一些时间。时间的长短取决于数据量的大小,你使用的存储引擎以及访问访问,以及其他因素。理论上来说,数据重建的速度在50M/秒左右。

如果你想,你仍然可以使用外部数据库,但是在大多数情况下,Samza的本地状态存储(state store)更适合流处理任务。

Key-value storage

就像接下来所述的那样,任何存储引擎都可以被集成到samza中。Samza本身也提供了一个基于RocksDB通过JNI API实现的、开箱即用的key-value存储实现。

RocksDB自身有些非常好的特性。它在Java堆外进行内存分配,这样比基于java的存储引擎能更好的管理内存(基于java的存储引擎可能会经常触发垃圾回收从而造成程序暂停)。可以非常快速的将一些小型的数据集放入内存;当数据大小超过内存大小时,速度也可能一样会很快;那是因为它的日志结构(log-structured),允许快速写入。它同时还支持数据块压缩,这将帮助减少I/O数据量以及内存占用大小。

在RocksDB前面,samza还提供了一个基于内存的缓存层,用来缓存频繁访问的需要反序列化的对象,以及实现批量写操作。如果某个key值被紧密的连续更新多次,那么批量操作会合并多次更新成单一的写操作。当task commit时,将其写入changelog.

为了在job中使用一个key-value存储,你需要像下面这样就行配置(job config):

1
2
3
4
5
6
7
8
9
10
11
12
# Use the key-value store implementation for a store called "my-store"
stores.my-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
# Use the Kafka topic "my-store-changelog" as the changelog stream for this store.
# This enables automatic recovery of the store after a failure. If you don't
# configure this, no changelog stream will be generated.
stores.my-store.changelog=kafka.my-store-changelog
# Encode keys and values in the store as UTF-8 strings.
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
stores.my-store.key.serde=string
stores.my-store.msg.serde=string

参考serialization section获得更多关于serde(序列化)的选择。

下面是展示task如何将每次接收的消息放置到store中:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyStatefulTask implements StreamTask, InitableTask {
private KeyValueStore<String, String> store;
public void init(Config config, TaskContext context) {
this.store = (KeyValueStore<String, String>) context.getStore("my-store");
}
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
store.put((String) envelope.getKey(), (String) envelope.getMessage());
}
}

这是完整的key-value store :

1
2
3
4
5
6
7
8
public interface KeyValueStore<K, V> {
V get(K key);
void put(K key, V value);
void putAll(List<Entry<K,V>> entries);
void delete(K key);
KeyValueIterator<K,V> range(K from, K to);
KeyValueIterator<K,V> all();
}

更多关于key-value store的配置项可以参考configuration reference

Known Issues

RocksDB存在一些没有完善的问题。我们推荐你去参考RocksDB优化指南.下面是一些值得注意的地方:

  1. RocksDB对于使用SSD固态硬盘做了很大的优化,如果使用的是普通的存储磁盘,性能表现可能会大打折扣。
  2. Samza的KeyValueStorageEngine.putAll()方法当前使用的并不是RocksDB的batching-put API,因为它是non-functional in Java
  3. 当在store中存在大量删除数据时,调用iterator.seekToFirst()方法将会变得很慢。

Implementing common use cases with the key-value store

在本节前面我们讨论了一些关于流处理的实例。接下来,我们看下基于key-value存储引擎如samza的RocksDB该如何实现这些流处理需求。

Windowed aggregation

示例: 统计某个用户在一个小时之内浏览某个网页的次数
实现方式: 两个处理阶段

  1. 首先根据用户ID对输入流进行分区,这样一个用户的事件总是由某个相同的流处理任务来处理。如果你的输入流已经是按用户ID进行了分区,那么你可以跳过此步。
  2. 第二个阶段是计算,在key-value存储中每个用户ID与其计算结果进行映射。当接收到一个新的事件时,从key-value存储中查询出该用户的计算结果,计算结果累加,然后再写回到存储中。当一个事件窗口触发时(如一个小时),该处理任务便对存储中得数据(userId->count)进行遍历,并将遍历结果发射到另外一个输出流中。
    注意,当时间窗口运行时当前的处理任务(job)实际上是暂停的。这对samza来说是完全正常的,因为遍历key-value存储的速度是非常快得。而此时samza会对输入流数据进行缓存。

Table-table join

示例: 对user profiles表和user settings表,通过两表的用户id进行join操作,再将结果数据分发到一个输出流中
实现方式: 流处理任务订阅user profiles和user settings表的数据变更日志流,这两个表都通过用户ID进行分区。该任务的key-value存储中以user_id作为key值,而value中保存了与该user_id对应的最新的user profiles和user settings数据记录。当接收到新的数据时,该流处理任务首先根据user_id查询最新的数据,然后更新合适的字段值(这取决于是user profiles的更新还是user settings的更新),然后将最新的join结果更新到key-value存储中。而key-value存储的变更日志也同时作为该task的输出流。

Table-stream join

示例: 对用户浏览页面统计实例进行增强,追加用户的邮政编码(在以后可能会根据用户邮政编码进行聚合统计)
实现方式: 流处理任务同时订阅了user profile更新数据流以及页面浏览事件流。这两个流都以user_id进行分区。该任务维护一个key-value存储,存储以用户ID以及邮编作为Key值。每当user profile更新时,它将抽取用户的最新邮编(ZIP)并将其更新到key-value存储中。当接收到一个页面浏览事件时,它将浏览页面的用户信息以及邮编作为一个消息发射到一个新的流中。

如果下个阶段需要根据用户邮编进行聚合计算,那么可以将用户邮编作为前面输出流的分区key值。从而确保相同的邮编被发送到同一个数据流中。

Stream-stream join

示例: 连接处理点击数据流和广告展现量(广告链接被点击事件)
在这个例子中,我们假设每次广告展示都有一个唯一的标识符,如一个UUID,广告展现以及点击事件包含着相同的标识符。这个标识符作为广告与点击事件的连接键(join key)。
实现方式: 点击事件和展示事件分别以展现ID和用户ID进行分区(假设这两个事件分别具有相同的展示ID以及用户ID)。该处理任务包含了两份存储,一个存储点击事件,一个存储广告展示事件。两份存储都以前面的唯一标识符作为key值。当该任务接收到一个点击事件时,则从展示事件存储中查询相应的展示,反之亦然。如果两者匹配上了,那么它们将成对的发送出去同时分别将两者从各自的存储中删除。如果没有匹配的,则将它写入自己的存储中。当时间窗口函数被调用时,删除以前没有匹配的事件。

Other storage engines

Samza的容错机制(发送到本地存储的写操作会被通过changelog进行备份)完全与存储引擎的存储结构以及查询的API解耦。同时一个Key-Value的存储引擎能够满足常用的处理操作,而通过实现SamzaEngine接口可以很容易扩展你需要的查询方式。Samza的模式特别适合嵌入式的存储引擎(该存储引擎与Samza StreamTask运行在同个进程内)

Some ideas for other storage engines that could be useful: a persistent heap (for running top-N queries), approximate algorithms such as bloom filters and hyperloglog, or full-text indexes such as Lucene. (Patches accepted!)

Fault tolerance semantics with state

正如前面checkpointing所描述的那样,当Samza组件发生故障时仅保证消息”至少处理一次”。这意味着,如果处理任务失败,Samza中的消息不会丢失,但有些消息可能会被重复处理。

if the effect of a message on state is idempotent, it is safe for the same message to be processed more than once.

比如前面用户邮政编码处理,如果user profile消息被重复两次基本没有什么影响,因此在重复处理时并不会改变用户的邮编。

但是对于一些non-idempotent(非等幂)操作如计数,那么”至少处理一次”语义可能会导致结果不准确。当Samza任务重启时,可能会对某些消息进行重复计算。我们计划在未来的版本中对这一问题进行修复。

Metrics

当在生产环境中运行流处理任务时,通过一些指标来监控你的处理任务是否正常,这点是很重要的。为了使监控实现起来较为容易,Samza内置了一些指标库。Samza本身会产生一些常用的指标,比如消息吞吐量。当然,你可以在你的任务代码中实现自己的监控指标。

监控指标值可被以各种方式进行展现。在开发环境下,你可以使用JMX,这时非常有用的。在生产环境中,一个常用的做法是定期将每个Samza Container的指标结果进行聚合后发送到kafka的一个”Metrics”topic(队列).然后你可以新建一个samza job来处理”Metrics” topic中的消息,最后再以你喜欢的图形方式进行展现,如Graphite

为了将你的处理任务指标发送到Kafka中,你可以进行如下配置:

1
2
3
4
5
6
7
8
9
10
11
12
# Define a metrics reporter called "snapshot", which publishes metrics
# every 60 seconds.
metrics.reporters=snapshot
metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
# Tell the snapshot reporter to publish to a topic called "metrics"
# in the "kafka" system.
metrics.reporter.snapshot.stream=kafka.metrics
# Encode metrics data as JSON.
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
systems.kafka.streams.metrics.samza.msg.serde=metrics

通过上面的配置,samza job会每隔一分钟自动将监控指标数据以JSON消息的方式发送到Kafka的”metrics”队列。消息内容样式类似下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
"header": {
"container-name": "samza-container-0",
"host": "samza-grid-1234.example.com",
"job-id": "1",
"job-name": "my-samza-job",
"reset-time": 1401729000347,
"samza-version": "0.0.1",
"source": "Partition-2",
"time": 1401729420566,
"version": "0.0.1"
},
"metrics": {
"org.apache.samza.container.TaskInstanceMetrics": {
"commit-calls": 7,
"commit-skipped": 77948,
"kafka-input-topic-offset": "1606",
"messages-sent": 985,
"process-calls": 1093,
"send-calls": 985,
"send-skipped": 76970,
"window-calls": 0,
"window-skipped": 77955
}
}
}

每个任务实例的指标消息都是相互隔离的,通过”header”部分你能知道job name,job ID,以及task在该job的分区。指标数据包含了你已经处理和发送了多少消息,当前输入流的位置偏移量以及其他一些详细信息。同时,它还能包含一些关于JVM附加信息(堆大小,垃圾收集信息以及线程等),kafka自身的生产者和消息者细心,等等。

假如你想去关注一些其他的指标信息,你可以非常容易在你的Job中实现。你可以通过Samza的built-in metrics框架来实现,该框架设计非常类似Coda Hale’s metrics library.

你可以通过MetricsRegistry来注册你的自定义指标程序。你的Stream Task需要实现 InitableTask接口,以便你能够从TaskContext中获取注册信息。下面是一个统计消息数的简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MyJavaStreamTask implements StreamTask, InitableTask {
private Counter messageCount;
public void init(Config config, TaskContext context) {
this.messageCount = context
.getMetricsRegistry()
.newCounter(getClass().getName(), "message-count");
}
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
messageCount.inc();
}
}

Samza目前支持两种类型的指标:countersgauges.通过counters你可以追踪某个指标对象出现的次数,gauges可以实现类似警报功能比如某个缓冲区的大小。每个task实例(每个输入分区)都有各自独立的指标值。

如果你想实现其他方式的指标报表系统,直接将指标展现在图表上(不通过kafka)。那么,你可以去实现一个MetricsReporterFactory并将其配置在你的Job配置文件中。

Windowing

有时,不管现在有多少消息正在处理,你需要定期触发一个流处理任务。比如,你想要获取某个页面每分钟的浏览量。为了实现这种功能,你需要一个递增的计数器,当接收一个页面浏览事件时该计数器就加1.然后,每隔一分钟,便将当前的计数结果发送到一个输出流中,最后再将计数器清零。

Samza的窗口功能能够帮助你在task中实现一个定期触发处理事件,比如每隔一分钟。你可以像下面这样进行配置来启用该窗口函数:

1
2
# Call the window() method every 60 seconds
task.window.ms=60000

接着,你需要实现一个WindowableTask接口。该接口中的window()方法会按照你在前面配置的时间间隔被Samza定期调用。
下面是一个时间窗口大小为一分钟的计数器实现示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class EventCounterTask implements StreamTask, WindowableTask {
public static final SystemStream OUTPUT_STREAM =
new SystemStream("kafka", "events-per-minute");
private int eventsSeen = 0;
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
eventsSeen++;
}
public void window(MessageCollector collector,
TaskCoordinator coordinator) {
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, eventsSeen));
eventsSeen = 0;
}
}

你可以使用window()方法中的MessageCollector参数,来将消息发送出去。但是建议不要再window()函数之外使用它。

注意: window()方法和”process()”方法会被同一个线程执行(single-threaded execution),也就说window()方法和process()方法不会被同时调用(对于一个StreamTask实例而言)。因此,你不用担心你的代码会有线程安全问题(不需要任何同步),但是如果你的window()方法执行时间较长的话,那么你的process()方法也会需要较长时间返回。

Event Loop

事件循环即container由单个线程负责读写消息,刷新指标数据,checkpointing以及windowing。
samza之所以会每个container一个线程是在于container被设计成使用单个CPU core。为了获得更多的并发量,你可以多运行几个container。这样的话可能会比多线程占用更多的资源,但是它也同时简化了资源管理以及任务之间的隔离。这样有助于实现Samza job的多租户集群即不同用户的作业同时运行而不会彼此造成影响。
Samza的设计强烈不推荐你在流处理任务中开启多线程。Samza内部使用多线程来进行输入输出之间的通讯,但是所有的消息处理以及用户代码都是以单线程循环事件来执行。总之,Samza也并非线程安全的。(这段翻译的有些疑惑,不太明白文档的真实含义,以此贴上原文)

You are strongly discouraged from using threads in your job’s code. Samza uses multiple threads internally for communicating with input and output streams, but all message processing and user code runs on a single-threaded event loop. In general, Samza is not thread-safe.

Event Loop Internals

一个Container可能会有多种SystemConsumers来处理不同输入流的消息。每个SystemConsumers都使用自己的线程读取消息,但是所有的SystemConsumers会将消息都写进一个进程的消息队列。然后,container会将该队列的所有消息发送到循环事件处理任务中。
循环处理事件会按照下面的步骤来处理消息:

  1. 从消息队列中取出一条消息
  2. 将该消息发送到对应或者说合适的task实例process()方法中。
  3. 如果该task实例实现了WindowableTask接口,那么当到达时间窗口的调用时机时,将会调用该task的window()方法。
  4. process()window()方法的处理结果发送到对应的SystemProducers
  5. 当某个任务的提交时间点结束时,写入该任务的checkpoints。

container会循环执行以上的过程,直到它被关闭。值得注意的是尽管一个container中可能会运行多个task实例(取决于输入流的分区个数),但是它们的process()window()方法都会在一个线程中被调用,而不是多个线程中并发执行。

Lifecycle

想在container的生命周期中添加自己的hook的唯一方式就是通过实现InitableTask, ClosableTask, StreamTask,以及WindowableTask接口。可以对一个StreamTask进行封装添加自己的处理逻辑,然后该StreamTask可以被另外一个实现了自定义逻辑的StreamTask调用(这段挺绕的,翻译的可能不准确,下面是原文)。

In cases where pluggable logic needs to be added to wrap a StreamTask, the StreamTask can be wrapped by another StreamTask implementation that handles the custom logic before calling into the wrapped StreamTask.

一个具体的例子,一组StreamTask想要在它们的process()方法中添加共享的try/catch处理逻辑。那么一个StreamTask用来封装原始的StreamTasks,围绕原始的process()方法调用try/catch处理逻辑。关于更多的细节内容,可以参考this discussion

JMX

Samza的container以及Yarn的ApplicationMaster都默认支持JMX.JMX可以被用来管理和操作JVM。比如你可以使用JDK中的jconsole

你可以让Samza发布它内部的指标或者你自定义指标作为JMX Beans。为了实现此功能,你需要如下配置:

1
2
3
4
5
# Define a Samza metrics reporter called "jmx", which publishes to JMX
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
# Use it (if you have multiple reporters defined, separate them with commas)
metrics.reporters=jmx

JMX的运行需要占用一个特定的端口,但是在分布式环境下,你很难去确定某台机器有哪些空闲的未被占用的端口。因此,Samza被设计为JMX随机选取一个端口来使用。如果你想使用Samza的JMX功能,你可以去查看Container的日志,该日志中含有该Container的JMX服务的详细信息。

1
2
3
2014-06-02 21:50:17 JmxServer [INFO] According to InetAddress.getLocalHost.getHostName we are samza-grid-1234.example.com
2014-06-02 21:50:17 JmxServer [INFO] Started JmxServer registry port=50214 server port=50215 url=service:jmx:rmi://localhost:50215/jndi/rmi://localhost:50214/jmxrmi
2014-06-02 21:50:17 JmxServer [INFO] If you are tunneling, you might want to try JmxS

Comments