Trending

#ProtoActor

Latest posts tagged with #ProtoActor on Bluesky

Latest Top
Trending

Posts tagged #ProtoActor

Just a moment...

Explore how EventStream in Proto.Actor enhances publish-subscribe communication between actors. Learn to optimize message passing and system design effectively. #ProtoActor #PubSub

1 0 0 0
Preview
Building a TCP server with Proto.Actor: Exploring Actor Model with .NET In a previous article, I demonstrated basic usage of Proto.Actor — actor model framework for .NET. This article builds a more complex example: a TCP socket server using three actors to handle…

Learn about building a TCP server with Proto.Actor in .NET! This guide explores the Actor Model's power for handling concurrent requests efficiently. #DotNet #ProtoActor

0 0 0 0
Preview
Construindo um Servidor TCP com Proto.Actor: Explorando o Modelo de Atores no .NET ## Introdução Em artigos anterior, demonstrei o uso básico do **Proto.Actor** , um framework para o Modelo de Atores em .NET. Neste artigo, vamos construir um exemplo mais complexo: um servidor TCP usando três atores para gerenciar conexões, recepção de bytes e processamento de dados. ## Visão Geral do Projeto ### Atores Principais 1. **WaitForTcpConnectionActor** : * Escuta por novas conexões TCP. * Cria uma instância de `ReceiveBytesActor` para cada conexão. 2. **ReceiveBytesActor** : * Recebe bytes do socket. * Cria uma instância de `ProcessActor` para desserializar, logar os dados e reiniciá-lo até 3 vezes em caso de falha. 3. **ProcessActor** : * Desserializa os bytes recebidos em um objeto `Sample` e o imprime no console. ### Requisitos * **.NET 8+** * Pacotes NuGet: * Proto.Actor ### Iniciando o Sistema de Atores Configuração do sistema de atores para criar o `WaitForTcpConnectionActor` e encerrá-lo ao pressionar `Ctrl+C`: using Proto; using TcpServer; var system = new ActorSystem(); var cancellationTokenSource = new CancellationTokenSource(); Console.CancelKeyPress += (_, _) => { cancellationTokenSource.Cancel(); }; system.Root.Spawn(Props.FromProducer(() => new WaitForTcpConnectionActor(9091))); while (!cancellationTokenSource.IsCancellationRequested) { await Task.Delay(1_000); } await system.ShutdownAsync(); ## Ator de Espera por Conexão TCP O Modelo de Atores funciona com os atores se comunicando por meio de mensagens. Vamos usar mensagens fornecido como Started e Terminated, além de nossas proprias messagens como a WaitForNextConnection. Conforme mostrado no artigo anterior, um ator pode ser definido da seguinte forma: public class WaitForTcpConnectionActor(int port) : IActor { public async Task ReceiveAsync(IContext context) { } } ### Iniciando o Listener TCP O primeiro passo é iniciar o servidor TCP usando a mensagem `Started`: public class WaitForTcpConnectionActor(int port) : IActor { private TcpListener? _listener; public async Task ReceiveAsync(IContext context) { if (context.Message is Started) { Console.WriteLine("Escutando na porta 9091"); _listener = TcpListener.Create(port); _listener.Start(); } } } ### Aguardando Conexões O Modelo de Atores funciona com os atores se comunicando por meio de mensagens. Vamos usar mensagens integradas como `Started` e Terminated, além de uma mensagem personalizada chamada `WaitForNextConnection`. Conforme mostrado no artigo anterior, um ator pode ser definido da seguinte forma: public class WaitForTcpConnectionActor(int port) : IActor { public async Task ReceiveAsync(IContext context) { } } ## Iniciando o TCP Listener O primeiro passo é iniciar nosso servidor TCP. Para isso, usamos a mensagem `Started`: public class WaitForTcpConnectionActor(int port) : IActor { private TcpListener? _listener; public async Task ReceiveAsync(IContext context) { if(context.Message is Started) { Console.WriteLine("Listening on port 9091"); _listener = TcpListener.Create(port); _listener.Start(); } } } Em seguida, aguarde uma conexão, enviando uma mensagem ao próprio ator: public class WaitForTcpConnectionActor(int port) : IActor { private TcpListener? _listener; public async Task ReceiveAsync(IContext context) { if(context.Message is Started) { _listener = TcpListener.Create(port); _listener.Start(); context.Send(context.Self, new WaitForNextConnection()); } } } ### Aguardando uma Conexão TCP Agora que estamos escutando conexões, precisamos aceitá-las e criar novos atores para processar cada uma: public class WaitForTcpConnectionActor(int port) : IActor { ... public async Task ReceiveAsync(IContext context) { if(context.Message is Started) { ... } else if(context.Message is WaitForNextConnection) { var socket = await _listener!.AcceptSocketAsync(cancellationToken); var actor = context.Spawn(Props.FromProducer(() => new ReceiveBytesActor())) .WithChildSupervisorStrategy(new OneForOneStrategy( (_, exception) => { Console.WriteLine("Error: {0}", exception); return SupervisorDirective.Restart; }, 3, TimeSpan.FromSeconds(1)));; context.Send(actor, new SocketAccepted(socket)); context.Send(context.Self, new WaitForNextConnection()); } } } #### Supervisão de Atores Configuramos uma estratégia OneForOneStrategy para supervisionar instâncias de `ReceiveBytesActor`: Se um ator filho falhar, ele será reiniciado até 3 vezes em 1 segundo. Isso garante que erros transitórios (ex: mensagens malformadas) não derrubem todo o sistema. ### Notificando Conclusão Quando o processamento é concluído, o ator pai recebe a mensagem `ProcessCompleted`. Isso sinaliza ao pai para parar explicitamente o ator filho, garantindo liberação adequada de recursos e evitando vazamentos de memória. public class WaitForTcpConnectionActor(int port) : IActor { ... public async Task ReceiveAsync(IContext context) { if(context.Message is Started) { ... } else if(context.Message is { Message: Terminated, Sender: not null })) { _listener?.Dispose(); } else if(context.Message is ProcessCompleted) { await context.StopAsync(Sender); } else if(context.Message is WaitForNextConnection) { ... } } } #### Liberação de Recursos Quando uma conexão é processada: * A mensagem ProcessCompleted indica conclusão. * O ator pai para o filho e aciona a liberação de recursos. ### Desligamento Elegante Ao desligar o sistema de atores, devemos liberar corretamente o `TcpListener` para evitar vazamentos de recursos: public class WaitForTcpConnectionActor(int port) : IActor { ... public async Task ReceiveAsync(IContext context) { if(context.Message is Started) { ... } else if(context.Message is { Message: Terminated, Sender: not null })) { _listener?.Dispose(); } else if(context.Message is ProcessCompleted) { .... } else if(context.Message is WaitForNextConnection) { ... } } } ## Recebendo Bytes O próximo passo é receber bytes de um socket. ### Tratando `SocketAccepted` Quando uma nova conexão é aceita, o ator armazena o socket e lê os bytes disponíveis: public class ReceiveBytesActor : IActor { private Socket? _socket; private byte[]? _buffer; public async Task ReceiveAsync(IContext context) { if(context.Message is SocketAccepted socket) { _socket = socket; _buffer = new byte[_socket.Available]; await _socket.ReceiveAsync(_buffer); var props = Props.FromProducer(() => new ProcessActor()); var actor = context.SpawnNamed(props, "json-serializer"); context.Send(actor, new SocketReceived(_buffer!)); } } } ### Notificando Conclusão Após o processamento, o ator para o `ProcessActor` filho e notifica seu pai para liberar recursos: public class ReceiveBytesActor : IActor { public async Task ReceiveAsync(IContext context) { if(context.Message is SocketAccepted socket) { // Lógica de recepção de bytes } else if(context.Message is ProcessCompleted) { await context.StopAsync(Sender); context.Send(context.Parent!, new ProcessCompleted()); } } } ### Encerramento Elegante do Socket Quando o ator é encerrado, ele descarta o socket e para todos os atores filhos para evitar vazamentos de memória: public class ReceiveBytesActor : IActor { public async Task ReceiveAsync(IContext context) { if(context.Message is Terminated) { _buffer = null; _socket?.Dispose(); await context.Children.StopMany(context); } else if(context.Message is SocketAccepted socket) { // Lógica de recepção de bytes } else if(context.Message is ProcessCompleted) { // Notificação de conclusão } } } ### Reenviando o Buffer Recebido Se o `ProcessActor` falhar e reiniciar, o `ReceiveBytesActor` reenvia o buffer armazenado para reprocessamento: public class ReceiveBytesActor : IActor { public async Task ReceiveAsync(IContext context) { if(context.Message is Terminated) { // Liberação de recursos } else if(context.Message is SocketAccepted socket) { // Lógica de recepção de bytes } else if(context.Message is ProcessCompleted) { // Notificação de conclusão } else if(context.Message is ResendBufferReceived) { context.Send(Sender, new ResendBufferReceived(_buffer!)); } } } ## Processamento de Dados O último ator **desserializa e registra os dados** recebidos. ### Mensagem `BufferReceived` A mensagem `BufferReceived` contém os bytes brutos do socket. Este ator desserializa os dados em um objeto `Sample` e imprime no console. Após o processamento, ele notifica o **ator pai** (`ReceiveBytesActor`) por meio da mensagem `ProcessCompleted` para liberar recursos: public class ProcessActor : IActor { public Task ReceiveAsync(IContext context) { if (context.Message is BufferReceived socketReceived) { var json = JsonSerializer.Deserialize<Sample>(socketReceived.Data)!; Console.WriteLine("Recebida com ID: {0} e nome: {1}", json.Id, json.Name); context.Send(context.Parent!, new ProcessCompleted(context.Self)); } return Task.CompletedTask; } } ### Reinicialização (`Restarting`) Quando um ator é reiniciado, o `Proto.Actor` envia a mensagem `Restarting` para o próprio ator. Isso permite que o ator notifique seu ator pai para retransmitir a mensagem original (ou estado), garantindo que o ator reiniciado possa reprocessá-la: public class ProcessActor : IActor { public Task ReceiveAsync(IContext context) { if (context.Message is Restarting) { context.Send(context.Parent!, new ResendBufferReceived()); } else if (context.Message is BufferReceived socketReceived) { ... } return Task.CompletedTask; } } ## Cliente TCP Implementação de um cliente TCP simples que envia entrada do usuário (convertida em JSON): using System.Net.Sockets; using System.Text.Json; using TcpServer.Client; var id = 0; while (true) { Console.Write("Digite um nome (q para sair/f para não serializar): "); var name = Console.ReadLine(); if (string.IsNullOrWhiteSpace(name)) { continue; } if (name == "q") { break; } try { var connection = new TcpClient(); await connection.ConnectAsync("localhost", 9091); var stream = connection.GetStream(); if (name == "f") { await stream.WriteAsync(new[] { (byte)'f' }); } else { await JsonSerializer.SerializeAsync(stream, new Sample { Id = id++, Name = name }); } connection.Close(); } catch (Exception e) { Console.WriteLine("Erro: {0}", e.Message); } } ## Conclusão O **Proto.Actor** é uma ferramenta poderosa para construir sistemas tolerantes a falhas, demonstrando como o Modelo de Atores simplifica concorrência, gerenciamento de recursos e recuperação de erros. Neste exemplo, exploramos: * **Estratégias de Supervisão** : Uso de `OneForOneStrategy` para reiniciar atores falhos até 3 vezes. * **Gerenciamento do Ciclo de Vida dos Atores** : Tratamento de mensagens como `Started`, `Terminated` e `Restarting`. * **Resiliência de Mensagens** : Retentativas de operações falhas via `ResendSocketAccepted` e `ProcessCompleted`. ### Considerações para Produção 1. **Leitura de Bytes** : * O método `_socket.Available` é útil para demonstrações, mas pode ser inconsistente em produção. Use `MemoryStream` para lidar com dados de tamanho variável. 2. **Tratamento de Erros** : * Em sistemas reais, encapsule operações de socket em blocos `try-catch` e implemente logs detalhados. 3. **Liberação de Recursos** : * Use timeouts ou verificações de heartbeat para evitar conexões órfãs. ## Código Completo Repositório GitHub
0 0 0 0