Index: openacs-4/packages/acs-tcl/tcl/cluster-init.tcl =================================================================== RCS file: /usr/local/cvsroot/openacs-4/packages/acs-tcl/tcl/cluster-init.tcl,v diff -u -r1.1.2.7 -r1.1.2.8 --- openacs-4/packages/acs-tcl/tcl/cluster-init.tcl 21 Jun 2022 10:50:39 -0000 1.1.2.7 +++ openacs-4/packages/acs-tcl/tcl/cluster-init.tcl 29 Dec 2022 13:02:48 -0000 1.1.2.8 @@ -42,9 +42,20 @@ #ns_register_filter trace GET $url ::acs::Cluster ns_register_filter preauth GET $url ::acs::Cluster + #ns_register_filter postauth GET $url ::acs::Cluster #ad_register_filter -priority 900 preauth GET $url ::acs::Cluster -} + ns_register_proc GET $url ::acs::Cluster incoming_request + + ns_atstartup { + ns_log notice "CHECK ::throttle '[::info commands ::throttle]'" + if {0 && [::info commands ::throttle] ne ""} { + ns_log notice "CHECK calling ::acs::Cluster check_nodes" + throttle do ::acs::Cluster check_nodes + } + } +} +ns_log notice "cluster-init done" # # Local variables: # mode: tcl Index: openacs-4/packages/acs-tcl/tcl/cluster-procs.tcl =================================================================== RCS file: /usr/local/cvsroot/openacs-4/packages/acs-tcl/tcl/cluster-procs.tcl,v diff -u -r1.1.2.1 -r1.1.2.2 --- openacs-4/packages/acs-tcl/tcl/cluster-procs.tcl 21 Jun 2022 10:50:39 -0000 1.1.2.1 +++ openacs-4/packages/acs-tcl/tcl/cluster-procs.tcl 29 Dec 2022 13:02:48 -0000 1.1.2.2 @@ -36,11 +36,12 @@ # # First, execute the command on the local server. # - eval $args + set result [eval $args] # # Then, distribute the command to all servers in the cluster. # ::acs::Cluster broadcast {*}$args + return $result } proc cache_flush_all {cache pattern} { @@ -58,6 +59,7 @@ :property host :property {port 80} :property {url /acs-cluster-do} + :property {chan} set :allowed_host_patterns [list] set :url /acs-cluster-do @@ -79,6 +81,7 @@ callback "" ns_cache "^ns_cache\s+eval" ns_cache_flush "" + util_memoize_flush_regexp_local "" ns_urlspace "" acs::cache_flush_all "" } @@ -93,19 +96,21 @@ # # Handling the ns_filter methods # - :public object method trace args { - #:log "trace" - return filter_return - } - :public object method preauth args { - #:log "preauth" - :incoming_request - return filter_return + #ns_log notice "PREAUTH returns filter_break" + return filter_break } :public object method postauth args { - #:log "postauth" + #ns_log notice "POSTAUTH returns filter_break" + return filter_break + } + + :public object method trace args { + #:log "trace" + #ns_log notice "TRACE handles request" + #:incoming_request + #ns_log notice "TRACE returns filter_return" return filter_return } @@ -122,7 +127,15 @@ set except_RE [dict get ${:allowed_command} $cmd_name] #ns_log notice "--cluster [list regexp $except_RE $cmd] -> [regexp $except_RE $cmd]" set allowed [expr {$except_RE eq "" || ![regexp $except_RE $cmd]}] - } else { + } elseif {[nsf::is object $cmd_name] + && ($cmd_name ::nsf::methods::object::info::hastype acs::Cache + || $cmd_name ::nsf::methods::object::info::hastype acs::LockfreeCache)} { + # + # Allow operations on cache objects (e.g. needed for) + # + ns_log notice "--cluster acs_cache operation: $cmd" + set allowed 1 + } else { set allowed 0 } return $allowed @@ -134,9 +147,17 @@ :public object method incoming_request {} { set cmd [ns_queryget cmd] set addr [lindex [ns_set iget [ns_conn headers] x-forwarded-for] end] + set sender [ns_set iget [ns_conn headers] host] + nsv_set cluster $sender-update [clock clicks -milliseconds] + nsv_incr cluster $sender-count if {$addr eq ""} {set addr [ns_conn peeraddr]} - #ns_log notice "--cluster got cmd='$cmd' from $addr" + ns_log notice "--cluster got cmd='$cmd' from $addr // sender $sender" ad_try { + #ns_logctl severity Debug(connchan) on + #ns_logctl severity Debug(request) on + #ns_logctl severity Debug(ns:driver) on + #ns_logctl severity Debug on + set result [::acs::Cluster execute [ns_conn peeraddr] $cmd] } on error {errorMsg} { ns_log notice "--cluster error: $errorMsg" @@ -148,7 +169,7 @@ } # - # Handling outgoing requests + # Handling incoming requests from host # :public object method execute {host cmd} { if {![info exists :allowed_host($host)]} { @@ -164,7 +185,7 @@ } } if {[::acs::Cluster allowed_command $cmd]} { - ns_log notice "--cluster executes command '$cmd' from host $host" + ns_log notice "--cluster executes command '$cmd' from host $host port [ns_conn peerport]" return [eval $cmd] } error "command '$cmd' from host $host not allowed" @@ -174,6 +195,9 @@ # # Send requests to all cluster nodes. # + if {[ns_ictl epoch] > 0} { + catch {::throttle do incr ::count(cluster:broadcast)} + } # Small optimization for cachingmode "none": no need to # send cache flushing requests to nodes, when there is no @@ -189,8 +213,18 @@ return } - foreach server [:info instances] { - $server message {*}$args + if {[ns_ictl epoch] > 0} { + foreach server [:info instances] { + catch {::throttle do incr ::count(cluster:message)} + set t0 [clock clicks -microseconds] + $server message {*}$args + set ms [expr {([clock clicks -microseconds] - $t0)/1000}] + catch {::throttle do incr ::agg_time(cluster:message) $ms} + } + } else { + foreach server [:info instances] { + $server message {*}$args + } } } @@ -214,6 +248,41 @@ } } + :public object method check_nodes {} { + # + # For the time being (testing only) just measure some + # times from the canonical server with hardcoded locations + # + if {[ad_canonical_server_p]} { + ns_log notice "-------check nodes" + ::acs::CS_127.0.0.1_8101 message set x ns_http + ::acs::CS_127.0.0.1_8444 message set x ns_https + ::acs::CS_127.0.0.1_8101 message -delivery connchan set x ns_connchan + ::acs::CS_127.0.0.1_8444 message -delivery connchan set x https-connchan + ::acs::CS_127.0.0.1_8101 message -delivery udp set x udp + } + # foreach node [::acs::Cluster info instances] { + # if {[$node require_connchan_channel]} { + # if {$node eq "::acs::CS_127.0.0.1_8101"} { + # #ns_log notice "[self] check_node $node is connected [$node cget -chan]" + # #ns_logctl severity Debug(connchan) on + # #ns_logctl severity Debug(request) on + # #ns_logctl severity Debug(ns:driver) on + # #ns_logctl severity Debug on + # $node connchan_message set ok 123 + # } + # } else { + # # + # # We see a warning message in the log file, when + # # the server cannot connect to the node. + # # + # #ns_log notice "[self] check_node $node is not connected" + # } + # } + set :to [::after 1000 [list [self] check_nodes]] + + } + :public object method register_nodes {} { # # Register the defined cluster nodes @@ -238,9 +307,10 @@ # ones, which are different from the current host (the # peer hosts). # - foreach hostport [server_cluster_all_hosts] { + foreach location [server_cluster_all_hosts] { + ns_log notice "creating ::acs::Cluster on $location" try { - server_cluster_get_config $hostport + server_cluster_get_config $location } on ok {config} { } on error {errorMsg} { ns_log notice "ignore $hostport (server_cluster_get_config returned $errorMsg)" @@ -253,16 +323,19 @@ ns_log debug "Cluster: server $host $port is no cluster peer" continue } - try { - ns_connchan connect $host $port - } on error {} { - ns_log notice "Cluster: server $host $port is not available" - continue - } on ok {chan} { - ns_connchan close $chan - } + # try { + # ns_logctl severity Debug(connchan) on + # ns_connchan connect $host $port + # } on error {} { + # ns_log notice "Cluster: server $host $port is not available" + # continue + # } on ok {chan} { + # ns_connchan close $chan + # } - ns_log debug "Cluster: server $host $port is an available cluster peer" + # ns_log debug "Cluster: server $host $port is an available cluster peer" + ns_log notice "call create ::acs::Cluster create CS_${host}_${port}" + ::acs::Cluster create CS_${host}_${port} \ -proto $proto \ -host $host \ @@ -271,17 +344,144 @@ } } } + :method name {} { + return ${:proto}://${:host}:${:port} + } - :public method message args { - :log "--cluster outgoing request to ${:proto}://${:host}:${:port} // $args" + :public method require_connchan_channel {} { + # + # + # + if {![info exists :chan]} { + set tlsOption [expr {${:proto} in {https} ? "-tls" : ""}] + try { + set :retry 0 + ns_connchan connect -timeout 10ms {*}$tlsOption ${:host} ${:port} + } on ok {result} { + set :chan $result + ns_log notice "-cluster: [:name] connected - channel ${:chan}" + } on error {errorMsg} { + ns_log warning "-cluster: [:name] can not connect" + } + } + return [info exists :chan] + } + :public method has_channel {} { + return [info exists :chan] + } + + :method connchan_retry_message {args} { + # + # Make a single retry to send an HTTP message to this node + # and return its full HTTP response on success. + # + + # + # Cleanup old connection + # try { + ns_connchan close ${:chan} + } on error {errorMsg} { + ns_log notice "... connchan ${:chan} CLOSE returns error $errorMsg, giving up" + return + } + unset -nocomplain :chan + # + # Create at new connection, but notice retry mode to avoid + # endless retries for one message + # + #ns_log notice "... connchan ${:chan} CLOSED" + if {[:require_connchan_channel]} { + set :retry 1 + ns_log notice "-cluster: [self] connchan RETRY channel ${:chan}" + :connchan_message {*}$args + } + } + + :method connchan_message {args} { + # + # Send an HTTP message to this node and return its full HTTP + # response on success. + # + set reply "" + #set t0 [clock clicks -microseconds] + if {[:require_connchan_channel]} { + set message "GET /${:url}?cmd=[ns_urlencode $args] HTTP/1.1\r\nHost:localhost\r\n\r\n" + #ns_log notice "-cluster: send $message to ${:proto}://${:host}:${:port}" + + try { + ns_connchan write ${:chan} $message + #set t2 [clock clicks -microseconds] + #ns_log notice "... message sent" + set reply [ns_connchan read ${:chan}] + #set t3 [clock clicks -microseconds] + + #ns_log notice "... reply $reply" + } on error {errorMsg} { + #ns_log notice "-cluster: send $args to ${:proto}://${:host}:${:port} returned ERROR $::errorInfo $errorMsg" + ns_log notice "-cluster: send connchan ${:chan} error $errorMsg RETRY ${:retry}" + if {${:retry} == 0} { + set reply [:connchan_retry_message {*}$args] + } + } on ok {result} { + set :retry 0 + #ns_log notice "-cluster: [:name] sent OK " \ + "total [expr {([clock clicks -microseconds] - $t0)/1000.0}]ms" \ + "write [expr {($t2 - $t0)/1000.0}]ms" \ + "read [expr {($t3 - $t2)/1000.0}]ms" \ + } + } + return $reply + } + + :method ns_http_message args { + #:log "--cluster outgoing request to ${:proto}://${:host}:${:port} // $args" + try { ns_http run ${:proto}://${:host}:${:port}/${:url}?cmd=[ns_urlencode $args] } on error {errorMsg} { ns_log warning "-cluster: send message to ${:proto}://${:host}:${:port}/${:url}?cmd=[ns_urlencode $args] failed: $errorMsg" + set result "" } on ok {result} { - ns_log notice "-cluster: response $result" + #ns_log notice "-cluster: response $result" } + return $result } + + :method udp_message args { + #:log "--cluster outgoing request to ${:proto}://${:host}:${:port} // $args" + try { + ns_udp ${:host} ${:port} "GET /${:url}?cmd=[ns_urlencode $args] HTTP/1.0\n\n" + } on error {errorMsg} { + ns_log warning "-cluster: send message to ${:proto}://${:host}:${:port}/${:url}?cmd=[ns_urlencode $args] failed: $errorMsg" + set result "" + } on ok {result} { + #ns_log notice "-cluster: response $result" + } + return $result + } + + :public method message {{-delivery ns_http} args} { + # + # Send a command by different means to the node server for + # intra-server talk. + # + # Valid delivery methods are + # - ns_http (for HTTP and HTTPS) + # - connchan (for HTTP and HTTPS) + # - udp (plain UDP only) + # + #:log "--cluster outgoing request to [:name] // $args" + set t0 [clock clicks -microseconds] + switch $delivery { + ns_http - + connchan - + udp {set result [:${delivery}_message {*}$args]} + default {error "unknown delivery method '$delivery'"} + } + ns_log notice "-cluster: [:name] $args sent" \ + "total [expr {([clock clicks -microseconds] - $t0)/1000.0}]ms" + return $result + } } }