2024-08-19 22:26:10 +03:00
# include <assert.h>
# include <limits.h>
# include "../graph.h"
# include "../module_registry.h"
# include "../queue.h"
# include "../hash_table.h"
typedef TYPED_HASH_TABLE ( bool ) EventSet ;
typedef struct {
GraphNode as_GraphNode ;
EventData terminator_prototype ;
bool has_terminator ;
bool is_jumping ;
bool has_max_time , has_max_length ;
RelativeTime max_time ;
size_t max_length ;
size_t additional_step ;
size_t skip_next ;
Queue buffer ;
EventSet buffered_set ;
} WindowGraphNode ;
inline static HashTableKey
hash_event_ptr ( const EventNode * ptr ) {
return hash_table_key_from_bytes ( ( void * ) & ptr , sizeof ( ptr ) ) ;
}
inline static bool
event_set_has ( const EventSet * set , const EventNode * element ) {
return hash_table_find ( set , hash_event_ptr ( element ) ) > = 0 ;
}
inline static bool
event_set_add ( EventSet * set , const EventNode * element ) {
const bool value = true ;
return hash_table_insert ( set , hash_event_ptr ( element ) , & value ) > = 0 ;
}
inline static bool
event_set_del ( EventSet * set , const EventNode * element ) {
return hash_table_delete_by_key ( set , hash_event_ptr ( element ) ) ;
}
static EventNode *
replicate_and_advance ( EventNode * * ptr )
{
EventNode * old = * ptr ;
if ( ! old ) {
return NULL ;
}
if ( event_replicate ( old , 1 ) ) {
* ptr = old - > next ;
return old ;
}
return NULL ;
}
static void
trigger_new_window ( WindowGraphNode * node , EventNode * base )
{
if ( ! base ) {
return ;
}
if ( node - > has_terminator ) {
EventNode * terminator = replicate_and_advance ( & base ) ;
if ( terminator ) {
terminator - > data . code = node - > terminator_prototype . code ;
terminator - > data . modifiers = modifier_set_copy ( node - > terminator_prototype . modifiers ) ;
terminator - > data . payload = node - > terminator_prototype . payload ;
2024-08-21 14:32:48 +03:00
// Preserve ttl, priority, time
2024-08-20 14:43:24 +03:00
graph_node_broadcast_forward_event ( & node - > as_GraphNode , terminator ) ;
2024-08-19 22:26:10 +03:00
}
}
size_t step = 1 ;
if ( node - > is_jumping ) {
step = queue_length ( & node - > buffer ) ;
}
step + = node - > additional_step ;
if ( step < 1 ) {
step = 1 ;
}
while ( step > 0 ) {
QueueValue popped ;
if ( ! queue_try_pop ( & node - > buffer , & popped ) ) {
break ;
}
- - step ;
EventNode * event = popped . as_ptr ;
event_set_del ( & node - > buffered_set , event ) ;
}
node - > skip_next + = step ;
QUEUE_FOREACH_INDEX ( i , & node - > buffer ) {
EventNode * orig = node - > buffer . values [ i ] . as_ptr ;
if ( ! orig ) {
continue ;
}
EventNode * recipient = replicate_and_advance ( & base ) ;
if ( ! recipient ) {
continue ;
}
2024-08-21 14:32:48 +03:00
recipient - > data = event_data_copy ( orig - > data ) ;
2024-08-20 14:43:24 +03:00
graph_node_broadcast_forward_event ( & node - > as_GraphNode , recipient ) ;
2024-08-19 22:26:10 +03:00
}
event_destroy ( base ) ;
}
static bool
handle_event ( EventPositionBase * self , EventNode * event )
{
WindowGraphNode * node = DOWNCAST ( WindowGraphNode , GraphNode , DOWNCAST ( GraphNode , EventPositionBase , self ) ) ;
if ( event_set_has ( & node - > buffered_set , event ) ) {
return false ;
}
EventNode * replacement ;
const AbsoluteTime new_time = event - > data . time ;
if ( node - > has_max_time ) {
const RelativeTime threshold = node - > max_time ;
while ( queue_length ( & node - > buffer ) > 0 ) {
EventNode * first_event = queue_peek ( & node - > buffer ) . as_ptr ;
if ( ! first_event ) {
break ;
}
RelativeTime delta = absolute_time_sub_absolute ( new_time , first_event - > data . time ) ;
if ( relative_time_cmp ( delta , threshold ) < = 0 ) {
break ;
}
// Here replacement should occur simultaneously-before the event
replacement = replicate_and_advance ( & event ) ;
if ( ! replacement ) {
return false ;
}
event - > position = self ;
replacement - > position = NULL ;
trigger_new_window ( node , replacement ) ;
}
}
if ( node - > skip_next > 0 ) {
- - ( node - > skip_next ) ;
return true ;
}
// Replacement should occur after the forwarded replica
replacement = NULL ;
if ( event_replicate ( event , 1 ) ) {
replacement = event - > next ;
}
if ( event_replicate ( event , 1 ) ) {
2024-08-20 14:43:24 +03:00
graph_node_broadcast_forward_event ( & node - > as_GraphNode , event - > next ) ;
2024-08-19 22:26:10 +03:00
}
queue_put ( & node - > buffer , ( QueueValue ) { . as_ptr = event } ) ;
event_set_add ( & node - > buffered_set , event ) ;
if ( node - > has_max_length ) {
const size_t threshold = node - > max_length ;
while ( replacement & & queue_length ( & node - > buffer ) > = threshold ) {
trigger_new_window ( node , replicate_and_advance ( & replacement ) ) ;
}
}
if ( replacement ) {
event_destroy ( replacement ) ;
}
self - > waiting_new_event = true ;
return true ;
}
static void
load_event_prototype ( InitializationEnvironment * env , const config_setting_t * setting , EventData * proto )
{
if ( ! setting ) {
return ;
}
proto - > code . ns = env_resolve_constant ( env , config_setting_get_member ( setting , " namespace " ) ) ;
proto - > code . major = env_resolve_constant ( env , config_setting_get_member ( setting , " major " ) ) ;
proto - > code . minor = env_resolve_constant ( env , config_setting_get_member ( setting , " minor " ) ) ;
proto - > payload = env_resolve_constant ( env , config_setting_get_member ( setting , " payload " ) ) ;
// TODO modifiers
}
static GraphNode *
create ( GraphNodeSpecification * spec , GraphNodeConfig * config , InitializationEnvironment * env )
{
WindowGraphNode * node = T_ALLOC ( 1 , WindowGraphNode ) ;
if ( ! node ) {
return NULL ;
}
bool is_jumping = false ;
ssize_t additional_step = 0 ;
long long max_length = 0 ;
long long max_milliseconds = 0 ;
EventData terminator = { . code = { 0 , 0 , 0 } , . payload = 0 , . priority = 10 , . modifiers = EMPTY_MODIFIER_SET , . time = { } } ;
bool
has_max_length = false ,
has_max_time = false ,
has_terminator = false ;
if ( config - > options ) {
is_jumping = env_resolve_constant ( env , config_setting_get_member ( config - > options , " is_jumping " ) ) ! = 0 ;
additional_step = ( ssize_t ) env_resolve_constant ( env , config_setting_get_member ( config - > options , " additional_step " ) ) ;
if ( additional_step < 0 ) {
additional_step = 0 ;
}
const config_setting_t * setting ;
if ( ( setting = config_setting_get_member ( config - > options , " max_length " ) ) ) {
has_max_length = true ;
max_length = env_resolve_constant ( env , setting ) ;
if ( max_length < 0 ) {
max_length = 0 ;
}
if ( max_length > INT32_MAX ) {
max_length = 0 ;
has_max_length = false ;
}
}
if ( ( setting = config_setting_get_member ( config - > options , " max_milliseconds " ) ) ) {
has_max_time = true ;
max_milliseconds = env_resolve_constant ( env , setting ) ;
if ( max_milliseconds < 0 ) {
max_milliseconds = 0 ;
}
}
if ( ( setting = config_setting_get_member ( config - > options , " terminator " ) ) ) {
has_terminator = true ;
load_event_prototype ( env , setting , & terminator ) ;
}
}
* node = ( WindowGraphNode ) {
. as_GraphNode = {
. as_EventPositionBase = {
. handle_event = & handle_event ,
. waiting_new_event = false ,
} ,
. specification = spec ,
. inputs = EMPTY_GRAPH_CHANNEL_LIST ,
. outputs = EMPTY_GRAPH_CHANNEL_LIST ,
} ,
. terminator_prototype = terminator ,
. has_terminator = has_terminator ,
. is_jumping = is_jumping ,
. has_max_time = has_max_time ,
. has_max_length = has_max_length ,
. max_time = relative_time_from_millisecond ( max_milliseconds ) ,
. max_length = max_length ,
. additional_step = additional_step ,
. skip_next = 0 ,
. buffer = EMPTY_QUEUE ,
. buffered_set = { } ,
} ;
hash_table_init ( & node - > buffered_set , NULL ) ;
return & node - > as_GraphNode ;
}
static void destroy
( GraphNodeSpecification * self , GraphNode * target )
{
( void ) self ;
WindowGraphNode * node = DOWNCAST ( WindowGraphNode , GraphNode , target ) ;
modifier_set_destruct ( & node - > terminator_prototype . modifiers ) ;
queue_deinit ( & node - > buffer ) ;
hash_table_deinit ( & node - > buffered_set ) ;
free ( target ) ;
}
GraphNodeSpecification nodespec_window = ( GraphNodeSpecification ) {
. create = & create ,
. destroy = & destroy ,
. register_io = NULL ,
. name = " window " ,
. documentation = " Passes events through while copying them into an internal buffer, when buffer buffer.length or (buffer.last.time - buffer.first.time) thresholds are met optionally sends terminator event, rewinds events to buffer start, skips ((is_jumping ? buffer.length : 1) + additional_step) events, retransmits remaining buffered events and starts over \n Accepts events on any connector \n Sends events on all connectors "
" \n Option 'is_jumping' (optional): whether to send events at most once "
" \n Option 'additional_step' (optional): natural number --- additional step relative to a regular sliding/jumping window "
" \n Option 'max_length' (optional): natural number --- maximum number of events in a window "
" \n Option 'max_milliseconds' (optional): natural number --- maximum number milliseconds between the first and the last event in a window "
" \n Option 'terminator' (optional): event to send after the window fullness condition is met: "
" \n \t Field 'namespace' (optional): set generated event code namespace "
" \n \t Field 'major' (optional): set generated event code major "
" \n \t Field 'minor' (optional): set generated event code minor "
" \n \t Field 'payload' (optional): set generated event payload "
,
} ;
MODULE_CONSTRUCTOR ( init )
{
register_graph_node_specification ( & nodespec_window ) ;
}