动手造轮子:实现一个简单的 EventBus

动手造轮子:实现一个简单的 EventBus,第1张

概述动手轮子:实现一个简单的 EventBus Intro EventBus 是一种事件发布订阅模式,通过 EventBus 我们可以很方便的实现解耦,将事件的发起和事件的处理的很好的分隔开来,很好的实 动手造轮子:实现一个简单的 EventBusIntro

EventBus 是一种事件发布订阅模式,通过 EventBus 我们可以很方便的实现解耦,将事件的发起和事件的处理的很好的分隔开来,很好的实现解耦。 微软官方的示例项目 EShopOnContainers 也有在使用 EventBus 。

这里的 EventBus 实现也是参考借鉴了微软 eShopOnContainers 项目。

EventBus 处理流程:

微服务间使用 EventBus 实现系统间解耦:

借助 EventBus 我们可以很好的实现组件之间,服务之间,系统之间的解耦以及相互通信的问题。

起初觉得 EventBus 和 MQ 其实差不多嘛,都是通过异步处理来实现解耦合,高性能。后来看到了下面这张图才算明白为什么要用 EventBus 以及 EventBus 和 MQ 之间的关系,EventBus 是抽象的,可以用MQ来实现 EventBus.

为什么要使用 EventBus解耦合(轻松的实现系统间解耦)高性能可扩展(每一个事件都是简单独立且不可更改的对象,只需要保存新增的事件,不涉及其他的变更删除 *** 作)系统审计(每一个事件都是不可变更的,每一个事件都是可追溯的)...EventBus 整体架构:IEventBase :所有的事件应该实现这个接口,这个接口定义了事件的唯一ID EventID 和事件发生的事件 EventAt

IEventHandler:定义了一个 Handle 方法来处理相应的事件

IEventStore:所有的事件的处理存储,保存事件的IEventHandler,一般不会直接 *** 作,通过 EventBus 的订阅和取消订阅来 *** 作 EventStore

IEventBus:用来发布/订阅/取消订阅事件,并将事件的某一个 IEventHandler 保存到 EventStore 或从 EventStore 中移除

使用示例

来看一个使用示例,完整代码示例:

internal class EventTest{    public static voID Maintest()    {        var eventBus = DependencyResolver.Current.ResolveService<IEventBus>();        eventBus.Subscribe<CounterEvent,CounterEventHandler1>();        eventBus.Subscribe<CounterEvent,CounterEventHandler2>();        eventBus.Subscribe<CounterEvent,DelegateEventHandler<CounterEvent>>();        eventBus.Publish(new CounterEvent { Counter = 1 });        eventBus.Unsubscribe<CounterEvent,CounterEventHandler1>();        eventBus.Unsubscribe<CounterEvent,DelegateEventHandler<CounterEvent>>();        eventBus.Publish(new CounterEvent { Counter = 2 });    }}internal class CounterEvent : EventBase{    public int Counter { get; set; }}internal class CounterEventHandler1 : IEventHandler<CounterEvent>{    public Task Handle(CounterEvent @event)    {        LogHelper.GetLogger<CounterEventHandler1>().Info($"Event Info: {@event.ToJson()},Handler Type:{GetType().Fullname}");        return Task.CompletedTask;    }}internal class CounterEventHandler2 : IEventHandler<CounterEvent>{    public Task Handle(CounterEvent @event)    {        LogHelper.GetLogger<CounterEventHandler2>().Info($"Event Info: {@event.ToJson()},Handler Type:{GetType().Fullname}");        return Task.CompletedTask;    }}
具体实现

EventStoreInMemory 实现:

EventStoreInMemory 是 IEventStore 将数据放在内存中的实现,使用了 ConcurrentDictionary 以及 HashSet 来尽可能的保证高效,具体实现代码如下:

public class EventStoreInMemory : IEventStore{    private Readonly ConcurrentDictionary<string,HashSet<Type>> _eventHandlers = new ConcurrentDictionary<string,HashSet<Type>>();    public bool AddSubscription<TEvent,TEventHandler>()        where TEvent : IEventBase        where TEventHandler : IEventHandler<TEvent>    {        var eventKey = GetEventKey<TEvent>();        if (_eventHandlers.ContainsKey(eventKey))        {            return _eventHandlers[eventKey].Add(typeof(TEventHandler));        }        else        {            return _eventHandlers.TryAdd(eventKey,new HashSet<Type>()            {                typeof(TEventHandler)            });        }    }    public bool Clear()    {        _eventHandlers.Clear();        return true;    }    public ICollection<Type> GetEventHandlerTypes<TEvent>() where TEvent : IEventBase    {        if(_eventHandlers.Count == 0)            return  new Type[0];        var eventKey = GetEventKey<TEvent>();        if (_eventHandlers.TryGetValue(eventKey,out var handlers))        {            return handlers;        }        return new Type[0];    }    public string GetEventKey<TEvent>()    {        return typeof(TEvent).Fullname;    }    public bool HasSubscriptionsForEvent<TEvent>() where TEvent : IEventBase    {        if(_eventHandlers.Count == 0)            return false;        var eventKey = GetEventKey<TEvent>();        return _eventHandlers.ContainsKey(eventKey);    }    public bool RemoveSubscription<TEvent,TEventHandler>()        where TEvent : IEventBase        where TEventHandler : IEventHandler<TEvent>    {        if(_eventHandlers.Count == 0)            return false;        var eventKey = GetEventKey<TEvent>();        if (_eventHandlers.ContainsKey(eventKey))        {            return _eventHandlers[eventKey].Remove(typeof(TEventHandler));        }        return false;    }}

EventBus 的实现,从上面可以看到 EventStore 保存的是 IEventHandler 对应的 Type,在 Publish 的时候根据 Type 从 IoC 容器中取得相应的 Handler 即可,如果没有在 IoC 容器中找到对应的类型,则会尝试创建一个类型实例,然后调用 IEventHandlerHandle 方法,代码如下:

/// <summary>/// EventBus in process/// </summary>public class EventBus : IEventBus{    private static Readonly ILogHelperLogger Logger = Helpers.LogHelper.GetLogger<EventBus>();    private Readonly IEventStore _eventStore;    private Readonly IServiceProvIDer _serviceProvIDer;    public EventBus(IEventStore eventStore,IServiceProvIDer serviceProvIDer = null)    {        _eventStore = eventStore;        _serviceProvIDer = serviceProvIDer ?? DependencyResolver.Current;    }    public bool Publish<TEvent>(TEvent @event) where TEvent : IEventBase    {        if (!_eventStore.HasSubscriptionsForEvent<TEvent>())        {            return false;        }        var handlers = _eventStore.GetEventHandlerTypes<TEvent>();        if (handlers.Count > 0)        {            var handlerTasks = new List<Task>();            foreach (var handlerType in handlers)            {                try                {                    if (_serviceProvIDer.GetServiceOrCreateInstance(handlerType) is IEventHandler<TEvent> handler)                    {                        handlerTasks.Add(handler.Handle(@event));                    }                }                catch (Exception ex)                {                    Logger.Error(ex,$"handle event [{_eventStore.GetEventKey<TEvent>()}] error,eventHandlerType:{handlerType.Fullname}");                }            }            handlerTasks.WhenAll().ConfigureAwait(false);            return true;        }        return false;    }    public bool Subscribe<TEvent,TEventHandler>()        where TEvent : IEventBase        where TEventHandler : IEventHandler<TEvent>    {        return _eventStore.AddSubscription<TEvent,TEventHandler>();    }    public bool Unsubscribe<TEvent,TEventHandler>()        where TEvent : IEventBase        where TEventHandler : IEventHandler<TEvent>    {        return _eventStore.RemoveSubscription<TEvent,TEventHandler>();    }}
项目实例

来看一个实际的项目中的使用,在我的活动室预约项目中有一个公告的模块,访问公告详情页面,这个公告的访问次数加1,把这个访问次数加1改成了用 EventBus 来实现,实际项目代码:https://github.com/WeihanLi/ActivityReservation/blob/67e2cb8e92876629a7af6dc051745dd8c7e9faeb/ActivityReservation/Startup.cs

定义 Event 以及 EventHandler
public class NoticeVIEwEvent : EventBase{    public GuID NoticeID { get; set; }    // UserID    // IP    // ...}public class NoticeVIEwEventHandler : IEventHandler<NoticeVIEwEvent>{    public async Task Handle(NoticeVIEwEvent @event)    {        await DependencyResolver.Current.TryInvokeServiceAsync<ReservationDbContext>(async dbContext =>        {            var notice = await dbContext.Notices.FindAsync(@event.NoticeID);            notice.NoticeVisitCount += 1;            await dbContext.SaveChangesAsync();        });    }}

这里的 Event 只定义了一个 NoticeID ,其实也可以把请求信息如IP/UA等信息加进去,在 EventHandler里处理以便日后数据分析。

注册 EventBus 相关服务以及 EventHandlers
services.AddSingleton<IEventBus,EventBus>();services.AddSingleton<IEventStore,EventStoreInMemory>();//register EventHandlersservices.AddSingleton<NoticeVIEwEventHandler>();
订阅事件
public voID Configure(IApplicationBuilder app,IHostingEnvironment env,ILoggerFactory loggerFactory,IEventBus eventBus){    eventBus.Subscribe<NoticeVIEwEvent,NoticeVIEwEventHandler>();     // ...}
发布事件
eventBus.Publish(new NoticeVIEwEvent { NoticeID = notice.NoticeID });
Referencehttps://github.com/dotnet-architecture/eShopOnContainershttps://docs.microsoft.com/zh-cn/previous-versions/msp-n-p/jj591559(v=pandp.10)https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/integration-event-based-microservice-communicationshttps://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/subscribe-eventshttps://github.com/sheng-jie/EventBushttps://github.com/WeihanLi/ActivityReservation 总结

以上是内存溢出为你收集整理的动手造轮子:实现一个简单的 EventBus全部内容,希望文章能够帮你解决动手造轮子:实现一个简单的 EventBus所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/langs/1213881.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-04
下一篇 2022-06-04

发表评论

登录后才能评论

评论列表(0条)

保存