RedisSubscribe.cs
7.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
using HslCommunication.Core.Net;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
namespace HslCommunication.Enthernet.Redis
{
/// <summary>
/// Redis协议的订阅操作,一个对象订阅一个或是多个频道的信息
/// </summary>
public class RedisSubscribe : NetworkXBase
{
#region Constructor
/// <summary>
/// 实例化一个发布订阅类的客户端,需要指定ip地址,端口,及订阅关键字
/// </summary>
/// <param name="ipAddress">服务器的IP地址</param>
/// <param name="port">服务器的端口号</param>
/// <param name="keys">订阅关键字</param>
public RedisSubscribe(string ipAddress, int port, string[] keys)
{
endPoint = new IPEndPoint(IPAddress.Parse(ipAddress), port);
keyWords = keys;
if (keys == null)
{
throw new Exception(StringResources.Language.KeyIsNotAllowedNull);
}
}
/// <summary>
/// 实例化一个发布订阅类的客户端,需要指定ip地址,端口,及订阅关键字
/// </summary>
/// <param name="ipAddress">服务器的IP地址</param>
/// <param name="port">服务器的端口号</param>
/// <param name="key">订阅关键字</param>
public RedisSubscribe(string ipAddress, int port, string key)
{
endPoint = new IPEndPoint(IPAddress.Parse(ipAddress), port);
keyWords = new string[] { key };
if (string.IsNullOrEmpty(key))
{
throw new Exception(StringResources.Language.KeyIsNotAllowedNull);
}
}
#endregion
#region Private Method
private OperateResult CreatePush()
{
CoreSocket?.Close();
OperateResult<Socket> connect = CreateSocketAndConnect(endPoint, 5000);
if (!connect.IsSuccess) return connect;
// 密码的验证
if (!string.IsNullOrEmpty(this.Password))
{
OperateResult check = Send(connect.Content, RedisHelper.PackStringCommand(new string[] { "AUTH", this.Password }));
if (!check.IsSuccess) return check;
OperateResult<byte[]> checkResult = RedisHelper.ReceiveCommand(connect.Content);
if (!checkResult.IsSuccess) return checkResult;
string msg = Encoding.UTF8.GetString(checkResult.Content);
if (!msg.StartsWith("+OK")) return new OperateResult(msg);
}
List<string> lists = new List<string>();
lists.Add("SUBSCRIBE");
lists.AddRange(keyWords);
OperateResult send = Send(connect.Content, RedisHelper.PackStringCommand(lists.ToArray()));
if (!send.IsSuccess) return send;
CoreSocket = connect.Content;
try
{
connect.Content.BeginReceive(new byte[0], 0, 0, SocketFlags.None, new AsyncCallback(ReceiveCallBack), connect.Content);
}
catch (Exception ex)
{
return new OperateResult(ex.Message);
}
return OperateResult.CreateSuccessResult();
}
private void ReceiveCallBack(IAsyncResult ar)
{
if (ar.AsyncState is Socket socket)
{
try
{
int receive = socket.EndReceive(ar);
OperateResult<byte[]> read = RedisHelper.ReceiveCommand(socket);
if (!read.IsSuccess)
{
SocketReceiveException(null);
return;
}
else
{
socket.BeginReceive(new byte[0], 0, 0, SocketFlags.None, new AsyncCallback(ReceiveCallBack), socket);
}
OperateResult<string[]> data = RedisHelper.GetStringsFromCommandLine(read.Content);
if (!data.IsSuccess)
{
LogNet?.WriteWarn(data.Message);
return;
}
if (data.Content[0].ToUpper() == "SUBSCRIBE")
{
return;
}
else if (data.Content[0].ToUpper() == "MESSAGE")
{
action?.Invoke(data.Content[1], data.Content[2]);
}
else
{
LogNet?.WriteWarn(data.Content[0]);
}
}
catch (ObjectDisposedException)
{
// 通常是主动退出
return;
}
catch (Exception ex)
{
SocketReceiveException(ex);
}
}
}
private void SocketReceiveException(Exception ex)
{
// 发生异常的时候需要进行重新连接
while (true)
{
if (ex != null) LogNet?.WriteException("Offline", ex);
Console.WriteLine(StringResources.Language.ReConnectServerAfterTenSeconds);
System.Threading.Thread.Sleep(this.reconnectTime);
if (CreatePush().IsSuccess)
{
Console.WriteLine(StringResources.Language.ReConnectServerSuccess);
break;
}
}
}
#endregion
#region Public Properties
/// <summary>
/// 如果Redis服务器设置了密码,此处就需要进行设置。必须在CreatePush方法调用前设置
/// </summary>
public string Password { get; set; }
#endregion
#region Public Method
/// <summary>
/// 创建数据推送服务
/// </summary>
/// <param name="pushCallBack">触发数据推送的委托</param>
/// <returns>是否创建成功</returns>
public OperateResult CreatePush(Action<string, string> pushCallBack)
{
action = pushCallBack;
return CreatePush();
}
/// <summary>
/// 关闭消息推送的界面
/// </summary>
public void ClosePush()
{
action = null;
CoreSocket?.Close();
}
#endregion
#region Private Member
private IPEndPoint endPoint; // 服务器的地址及端口信息
private string[] keyWords = null; // 缓存的订阅关键字
private Action<string, string> action; // 服务器推送后的回调方法
private int reconnectTime = 10000; // 重连服务器的时间
#endregion
#region Object Override
/// <summary>
/// 返回表示当前对象的字符串
/// </summary>
/// <returns>字符串信息</returns>
public override string ToString()
{
return $"RedisSubscribe[{endPoint}]";
}
#endregion
}
}