Skip to content

Commit

Permalink
docs: synced via GitHub Actions
Browse files Browse the repository at this point in the history
  • Loading branch information
nop-cao committed Jan 7, 2025
1 parent 1c57935 commit b6f23f3
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 0 deletions.
23 changes: 23 additions & 0 deletions src/dev-guide/cluster/event-queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# 事件队列

## 数据库事件表

### 广播

1. topic以 `bro-`为前缀
2. 启动时查询 `eventTime > currentTime - eventTimeOffset` 的事件,并依次处理

```sql
select o
from event o
where o.topic = 'test'
and o.eventTime > ${startTime}
and o.eventID > lastProcessEventID
order by o.eventID asc
```

### 集群分工

1. topic以 `cluster-`为前缀
2. 查询所有 `processTime < currentTime`的记录,修改processTime为`currentTime + maxProcessSpan`,这样其他节点就不会处理同样的事件
3. 成功处理完毕后将事件状态设置为processed,或者删除事件
2 changes: 2 additions & 0 deletions src/dev-guide/orm/logical-delete.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ deleteVersionProp用于避免逻辑删除时唯一键依然存在导致与正常
缺省情况下EQL查询和关联集合查询都会考虑到逻辑删除条件,只查询`delFlag=0`的记录。

QueryBean和SQL对象都支持disableLogicalDelete设置,从而禁用逻辑删除的过滤条件。

IOrmEntity上也具有`orm_disableLogicalDelete()`方法,则可以真正执行物理删除,否则`session.delete(entity)`实际只会设置entity的deleteFlag属性。
14 changes: 14 additions & 0 deletions src/dev-guide/orm/optimistic-lock.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# 乐观锁

## 启用乐观锁

在Excel模型中,为字段指定【数据域】为version,则会设置实体的versionProp配置,它的类型应该是INTEGER或者BIGINT。

更新数据库时会自动加入version条件,并自动对version+1。

如果更新了多条记录,则EntityPersisterImpl中将会抛出`nop.err.orm.update-entity-multiple-rows`异常.
如果更新失败,更新条目数为0,则会抛出`nop.err.orm.update-entity-not-found`异常。

## 禁用乐观锁

`entity.orm_disableVersionCheckError(true)`可以禁用指定实体的乐观锁更新检查,按照versionProp更新失败时不会抛出异常,但是会导致实体被设置为readonly。
77 changes: 77 additions & 0 deletions src/dev-guide/stream/windowing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# 窗口函数

## WindowOperator

1. 先为元素分配一组窗口
2. 针对每个窗口,检查trigger是否触发,并修改timer状态
3. timer触发时,同样是检查trigger是否触发

```javascript
void processElement(StreamRecord<IN> element){
const elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
for (const window of elementWindows) {
TriggerResult triggerResult = triggerContext.onElement(element);

if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents != null) {
emitWindowContents(actualWindow, contents);
}
}

if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(actualWindow);
}
}

void onEventTime(InternalTimer<K, W> timer) {
triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();

MergingWindowSet<W> mergingWindows;

if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
// window and therefore the Trigger state, however, so nothing to do.
return;
} else {
windowState.setCurrentNamespace(stateWindow);
}
} else {
windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;
}

TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());

if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents != null) {
emitWindowContents(triggerContext.window, contents);
}
}

if (triggerResult.isPurge()) {
windowState.clear();
}

if (windowAssigner.isEventTime()
&& isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, windowState, mergingWindows);
}

if (mergingWindows != null) {
// need to make sure to update the merging state in state
mergingWindows.persist();
}
}
```

EvictorWindowOperator和WindowOperator的区别仅在于emit的时候是否调用evictor来删除窗口。

0 comments on commit b6f23f3

Please sign in to comment.