make message_bus use regional suffixes

closes FOO-2054
flag=none

TEST PLAN:
  1) enable AUA writing in a multi-region environment
  2) each region should be successfully writing to it's
     own regionally-suffixed namespace

Change-Id: Ia8d25c4f29c5b9081890081395c62b59a8a4a2c3
Reviewed-on: https://gerrit.instructure.com/c/canvas-lms/+/266977
Tested-by: Service Cloud Jenkins <svc.cloudjenkins@instructure.com>
Reviewed-by: Cody Cutrer <cody@instructure.com>
QA-Review: Cody Cutrer <cody@instructure.com>
Product-Review: Cody Cutrer <cody@instructure.com>
This commit is contained in:
Ethan Vizitei 2021-06-11 14:54:37 -05:00
parent 5d33184845
commit bf9b81a864
3 changed files with 111 additions and 5 deletions

View File

@ -55,10 +55,11 @@ module MessageBus
# and build a new producer rather than using a process-cached one,
# even if such a producer is available.
def self.producer_for(namespace, topic_name, force_fresh: false)
check_conn_pool(["producers", namespace, topic_name], force_fresh: force_fresh) do
ns = MessageBus::Namespace.build(namespace)
check_conn_pool(["producers", ns.to_s, topic_name], force_fresh: force_fresh) do
Bundler.require(:pulsar)
::MessageBus::CaCert.ensure_presence!(self.config)
topic = self.topic_url(namespace, topic_name)
topic = self.topic_url(ns, topic_name)
self.client.create_producer(topic)
end
end
@ -73,10 +74,11 @@ module MessageBus
# and build a new consumer rather than using a process-cached one,
# even if such a consumer is available.
def self.consumer_for(namespace, topic_name, subscription_name, force_fresh: false)
check_conn_pool(["consumers", namespace, topic_name, subscription_name], force_fresh: force_fresh) do
ns = MessageBus::Namespace.build(namespace)
check_conn_pool(["consumers", ns.to_s, topic_name, subscription_name], force_fresh: force_fresh) do
Bundler.require(:pulsar)
::MessageBus::CaCert.ensure_presence!(self.config)
topic = topic_url(namespace, topic_name)
topic = topic_url(ns, topic_name)
consumer_config = Pulsar::ConsumerConfiguration.new({})
consumer_config.subscription_initial_position = :earliest
self.client.subscribe(topic, subscription_name, consumer_config)
@ -84,6 +86,7 @@ module MessageBus
end
def self.topic_url(namespace, topic_name, app_env=Canvas.environment)
ns = MessageBus::Namespace.build(namespace)
app_env = (app_env || "development").downcase
conf_hash = self.config
# by using the application env in the topic name, we can
@ -91,7 +94,7 @@ module MessageBus
# like test/beta/edge whatever and not have to provision
# other overhead to separate them or deal with the confusion of shared
# data in a single topic.
"persistent://#{conf_hash['PULSAR_TENANT']}/#{namespace}/#{app_env}-#{topic_name}"
"persistent://#{conf_hash['PULSAR_TENANT']}/#{ns.to_s}/#{app_env}-#{topic_name}"
end
##

View File

@ -0,0 +1,55 @@
# frozen_string_literal: true
# Copyright (C) 2021 - present Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
##
# Namespaces in a pulsar instance can be shared
# across clusters for topic replication, but need
# distinct namespaces in each cluster if you intend
# to support disjoint topic sets in each "same-purposed"
# namespace. This class exists to ensure that if
# canvas has a regional code, it gets included as part of
# namespace specification in working with the message bus library.
#
# In an open-source/single-tenant canvas installation,
# the regional config will be nil, and no suffix need be applied.
module MessageBus
class Namespace
def initialize(namespace_base_string)
@base_string = namespace_base_string
@suffix = Canvas.region_code
end
def self.build(namespace)
return namespace if namespace.is_a?(::MessageBus::Namespace)
self.new(namespace)
end
# If somehow the consuming code is already injecting
# the topic suffix, or if we mistakenly wrap a string
# multiple times, don't continue to add suffixes.
def to_s
return @base_string if @suffix.nil?
extension = "-#{@suffix}"
return @base_string if @base_string[(-1*(extension.length))..] == extension
@base_string + extension
end
end
end

View File

@ -0,0 +1,48 @@
# frozen_string_literal: true
#
# Copyright (C) 2021 - present Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
require 'spec_helper'
describe MessageBus::Namespace do
describe "suffix expansion" do
it "adds no suffix when region code absent" do
expect(Canvas.region_code).to be_nil
ns = MessageBus::Namespace.build("ns-name")
expect(ns.to_s).to eq("ns-name")
end
it "appends a cluster suffix when region present" do
allow(Canvas).to receive(:region_code).and_return("abc")
ns = MessageBus::Namespace.build("ns-name")
expect(ns.to_s).to eq("ns-name-abc")
end
it "will not repeatedly extend a namespace name" do
allow(Canvas).to receive(:region_code).and_return("abc")
ns = MessageBus::Namespace.build("ns-name-abc")
expect(ns.to_s).to eq("ns-name-abc")
end
it "returns the same namespace object if already built" do
ns = MessageBus::Namespace.build("ns-name")
ns2 = MessageBus::Namespace.build(ns)
expect(ns.to_s).to eq(ns2.to_s)
end
end
end