Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Presence channel handler #25

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions js-src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@
}

here(callback: Function): Channel {
// TODO: implement
this.on('subscription_succeeded', (data) => {
callback(data)

Check warning on line 121 in js-src/Channel.ts

View check run for this annotation

Codecov / codecov/patch

js-src/Channel.ts#L120-L121

Added lines #L120 - L121 were not covered by tests
})

return this
}
Expand All @@ -126,7 +128,9 @@
* Listen for someone joining the channel.
*/
joining(callback: Function): Channel {
// TODO: implement
this.on('member_added', (data) => {
callback(data)

Check warning on line 132 in js-src/Channel.ts

View check run for this annotation

Codecov / codecov/patch

js-src/Channel.ts#L131-L132

Added lines #L131 - L132 were not covered by tests
})

return this
}
Expand All @@ -135,7 +139,9 @@
* Listen for someone leaving the channel.
*/
leaving(callback: Function): Channel {
// TODO: implement
this.on('member_removed', (data) => {
callback(data)

Check warning on line 143 in js-src/Channel.ts

View check run for this annotation

Codecov / codecov/patch

js-src/Channel.ts#L142-L143

Added lines #L142 - L143 were not covered by tests
})

return this
}
Expand Down
8 changes: 6 additions & 2 deletions src/ConnectionRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Aws\ApiGatewayManagementApi\ApiGatewayManagementApiClient;
use Aws\ApiGatewayManagementApi\Exception\ApiGatewayManagementApiException;
use GuzzleHttp\Exception\ClientException;
use Symfony\Component\HttpFoundation\Response;

class ConnectionRepository
{
Expand All @@ -28,8 +29,11 @@ public function sendMessage(string $connectionId, string $data): void
'Data' => $data,
]);
} catch (ApiGatewayManagementApiException $e) {
// GoneException: The connection with the provided id no longer exists.
if ($e->getAwsErrorCode() === 'GoneException') {
// GoneException: The connection with the provided id no longer exists.
if (
$e->getStatusCode() === Response::HTTP_GONE ||
$e->getAwsErrorCode() === 'GoneException'
) {
$this->subscriptionRepository->clearConnection($connectionId);

return;
Expand Down
64 changes: 61 additions & 3 deletions src/Handler.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Bref\Event\Http\HttpResponse;
use Illuminate\Support\Arr;
use Illuminate\Support\Str;
use Symfony\Component\HttpFoundation\Response;
use Throwable;

class Handler extends WebsocketHandler
Expand Down Expand Up @@ -42,6 +43,7 @@

protected function handleDisconnect(WebsocketEvent $event, Context $context): void
{
$this->sendPresenceDisconnectNotices($event);
$this->subscriptionRepository->clearConnection($event->getConnectionId());
}

Expand Down Expand Up @@ -115,12 +117,28 @@
}
}

$this->subscriptionRepository->subscribeToChannel($event->getConnectionId(), $channel);
if (Str::startsWith($channel, 'presence-')) {
$this->subscriptionRepository->subscribeToPresenceChannel(
$event->getConnectionId(),

Check warning on line 122 in src/Handler.php

View check run for this annotation

Codecov / codecov/patch

src/Handler.php#L121-L122

Added lines #L121 - L122 were not covered by tests
$channelData,
$channel
);
$data = $this->subscriptionRepository->getUserListForPresenceChannel($channel)
->transform(function ($user) {
$user = json_decode($user, true);
return Arr::get($user, 'user_info', json_encode($user));
})
->toArray();
$this->sendPresenceAdd($event, $channel, Arr::get(json_decode($channelData, true), 'user_info'));

Check warning on line 132 in src/Handler.php

View check run for this annotation

Codecov / codecov/patch

src/Handler.php#L126-L132

Added lines #L126 - L132 were not covered by tests
} else {
$this->subscriptionRepository->subscribeToChannel($event->getConnectionId(), $channel);
$data = [];
}

$this->sendMessage($event, $context, [
'event' => 'subscription_succeeded',
'channel' => $channel,
'data' => [],
'data' => $data,
]);
}

Expand All @@ -138,6 +156,18 @@
]);
}

public function sendPresenceDisconnectNotices(WebsocketEvent $event): void
{
$channels = $this->subscriptionRepository->getChannelsSubscribedToByConnectionId($event->getConnectionId());
$channels->filter(function ($info) {
return Str::startsWith(Arr::get($info, 'channel'), 'presence-');
})->each(function ($info) use ($event) {
$channel = Arr::get($info, 'channel');
$userData = json_decode(Arr::get($info, 'userData'), true);
$this->sendPresenceRemove($event, $channel, Arr::get($userData, 'user_info'));
});
}

public function broadcastToChannel(WebsocketEvent $event, Context $context): void
{
$skipConnectionId = $event->getConnectionId();
Expand All @@ -158,6 +188,34 @@
->each(fn (string $connectionId) => $this->sendMessageToConnection($connectionId, $data));
}

public function sendPresenceAdd(WebsocketEvent $event, string $channel, array $data): void

Check warning on line 191 in src/Handler.php

View check run for this annotation

Codecov / codecov/patch

src/Handler.php#L191

Added line #L191 was not covered by tests
{
$skipConnectionId = $event->getConnectionId();
$eventBody = json_decode($event->getBody(), true);
$data = json_encode([
'event'=>'member_added',
'channel'=>$channel,
'data'=>$data
]) ?: '';
$this->subscriptionRepository->getConnectionIdsForChannel($channel)
->reject(fn ($connectionId) => $connectionId === $skipConnectionId)
->each(fn (string $connectionId) => $this->sendMessageToConnection($connectionId, $data));
}

Check warning on line 203 in src/Handler.php

View check run for this annotation

Codecov / codecov/patch

src/Handler.php#L193-L203

Added lines #L193 - L203 were not covered by tests

public function sendPresenceRemove(WebsocketEvent $event, string $channel, array $data): void
{
$skipConnectionId = $event->getConnectionId();
$eventBody = json_decode($event->getBody(), true);
$data = json_encode([
'event'=>'member_removed',
'channel'=>$channel,
'data'=>$data
]) ?: '';

Check warning on line 213 in src/Handler.php

View check run for this annotation

Codecov / codecov/patch

src/Handler.php#L213

Added line #L213 was not covered by tests
$this->subscriptionRepository->getConnectionIdsForChannel($channel)
->reject(fn ($connectionId) => $connectionId === $skipConnectionId)
->each(fn (string $connectionId) => $this->sendMessageToConnection($connectionId, $data));
}

public function sendMessage(WebsocketEvent $event, Context $context, array $data): void
{
$this->connectionRepository->sendMessage($event->getConnectionId(), json_encode($data, JSON_THROW_ON_ERROR));
Expand All @@ -168,7 +226,7 @@
try {
$this->connectionRepository->sendMessage($connectionId, $data);
} catch (ApiGatewayManagementApiException $exception) {
if ($exception->getAwsErrorCode() === 'GoneException') {
if ($exception->getStatusCode() === Response::HTTP_GONE) {

Check warning on line 229 in src/Handler.php

View check run for this annotation

Codecov / codecov/patch

src/Handler.php#L229

Added line #L229 was not covered by tests
$this->subscriptionRepository->clearConnection($connectionId);
return;
}
Expand Down
49 changes: 49 additions & 0 deletions src/SubscriptionRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,43 @@
->unique();
}

public function getUserListForPresenceChannel(string ...$channels): Collection

Check warning on line 44 in src/SubscriptionRepository.php

View check run for this annotation

Codecov / codecov/patch

src/SubscriptionRepository.php#L44

Added line #L44 was not covered by tests
{
$promises = collect($channels)->map(fn ($channel) => $this->dynamoDb->queryAsync([
'TableName' => $this->table,
'IndexName' => 'lookup-by-channel',
'KeyConditionExpression' => 'channel = :channel',

Check warning on line 49 in src/SubscriptionRepository.php

View check run for this annotation

Codecov / codecov/patch

src/SubscriptionRepository.php#L46-L49

Added lines #L46 - L49 were not covered by tests
'ExpressionAttributeValues' => [
':channel' => ['S' => $channel],

Check warning on line 51 in src/SubscriptionRepository.php

View check run for this annotation

Codecov / codecov/patch

src/SubscriptionRepository.php#L51

Added line #L51 was not covered by tests
],
]))->toArray();

Check warning on line 53 in src/SubscriptionRepository.php

View check run for this annotation

Codecov / codecov/patch

src/SubscriptionRepository.php#L53

Added line #L53 was not covered by tests

$responses = Utils::all($promises)->wait();

Check warning on line 55 in src/SubscriptionRepository.php

View check run for this annotation

Codecov / codecov/patch

src/SubscriptionRepository.php#L55

Added line #L55 was not covered by tests

return collect($responses)
->flatmap(fn (\Aws\Result $result): array => $result['Items'])
->map(fn (array $item): string => Arr::get($item, 'userData.S', ''))
->unique();

Check warning on line 60 in src/SubscriptionRepository.php

View check run for this annotation

Codecov / codecov/patch

src/SubscriptionRepository.php#L57-L60

Added lines #L57 - L60 were not covered by tests
}

public function getChannelsSubscribedToByConnectionId(string $connectionId): Collection

Check warning on line 63 in src/SubscriptionRepository.php

View check run for this annotation

Codecov / codecov/patch

src/SubscriptionRepository.php#L63

Added line #L63 was not covered by tests
{
$response = $this->dynamoDb->query([
'TableName' => $this->table,
'KeyConditionExpression' => 'connectionId = :connectionId',

Check warning on line 67 in src/SubscriptionRepository.php

View check run for this annotation

Codecov / codecov/patch

src/SubscriptionRepository.php#L65-L67

Added lines #L65 - L67 were not covered by tests
'ExpressionAttributeValues' => [
':connectionId' => ['S' => $connectionId],

Check warning on line 69 in src/SubscriptionRepository.php

View check run for this annotation

Codecov / codecov/patch

src/SubscriptionRepository.php#L69

Added line #L69 was not covered by tests
],
]);
return collect(Arr::get($response, 'Items', []))
->transform(function ($item) {

Check warning on line 73 in src/SubscriptionRepository.php

View check run for this annotation

Codecov / codecov/patch

src/SubscriptionRepository.php#L72-L73

Added lines #L72 - L73 were not covered by tests
return [
'channel'=>Arr::get($item, 'channel.S'),
'userData'=>Arr::get($item, 'userData.S'),

Check warning on line 76 in src/SubscriptionRepository.php

View check run for this annotation

Codecov / codecov/patch

src/SubscriptionRepository.php#L75-L76

Added lines #L75 - L76 were not covered by tests
];
});

Check warning on line 78 in src/SubscriptionRepository.php

View check run for this annotation

Codecov / codecov/patch

src/SubscriptionRepository.php#L78

Added line #L78 was not covered by tests
}

public function clearConnection(string $connectionId): void
{
$response = $this->dynamoDb->query([
Expand Down Expand Up @@ -86,4 +123,16 @@
],
]);
}

public function subscribeToPresenceChannel(string $connectionId, string $userData, string $channel): void

Check warning on line 127 in src/SubscriptionRepository.php

View check run for this annotation

Codecov / codecov/patch

src/SubscriptionRepository.php#L127

Added line #L127 was not covered by tests
{
$this->dynamoDb->putItem([
'TableName' => $this->table,

Check warning on line 130 in src/SubscriptionRepository.php

View check run for this annotation

Codecov / codecov/patch

src/SubscriptionRepository.php#L129-L130

Added lines #L129 - L130 were not covered by tests
'Item' => [
'connectionId' => ['S' => $connectionId],
'userData' => ['S' => $userData],
'channel' => ['S' => $channel],

Check warning on line 134 in src/SubscriptionRepository.php

View check run for this annotation

Codecov / codecov/patch

src/SubscriptionRepository.php#L132-L134

Added lines #L132 - L134 were not covered by tests
],
]);
}

Check warning on line 137 in src/SubscriptionRepository.php

View check run for this annotation

Codecov / codecov/patch

src/SubscriptionRepository.php#L137

Added line #L137 was not covered by tests
}
81 changes: 80 additions & 1 deletion tests/HandlerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use GuzzleHttp\Psr7\Response;
use Mockery\Mock;
use Psr\Http\Message\RequestInterface;
use Symfony\Component\HttpFoundation\Response as SymfonyResponse;

it('can subscribe to open channels', function () {
app()->instance(SubscriptionRepository::class, Mockery::mock(SubscriptionRepository::class, function ($mock) {
Expand Down Expand Up @@ -113,7 +114,85 @@
], $context);
});

it('handles dropped connections', function () {
it('leaves presence channels', function () {
app()->instance(SubscriptionRepository::class, Mockery::mock(SubscriptionRepository::class, function ($mock) {
/** @var Mock $mock */
$mock->shouldReceive('getChannelsSubscribedToByConnectionId')->withArgs(function (string $connectionId): bool {
return $connectionId === 'connection-id-1';
})->once()
->andReturn(collect([
[
'channel'=>'presence-channel',
'userData'=>json_encode(['user_info'=>['the user info']]),
],
[
'channel'=>'other-channel',
]
]));
$mock->shouldReceive('getConnectionIdsForChannel')->withArgs(function (string $channel) {
return $channel === 'presence-channel';
})->once()
->andReturn(collect(['connection-id-1', 'connection-id-2']));
$mock->shouldReceive('clearConnection')->withArgs(function (string $connectionId) {
return $connectionId === 'connection-id-1';
})->once();
}));

app()->instance(ConnectionRepository::class, Mockery::mock(ConnectionRepository::class, function ($mock) {
/** @var Mock $mock */
$mock->shouldReceive('sendMessage')->withArgs(function (string $connectionId, string $data): bool {
return $connectionId === 'connection-id-2' and $data === '{"event":"member_removed","channel":"presence-channel","data":["the user info"]}';
})->once();
}));

/** @var Handler $handler */
$handler = app(Handler::class);

$context = new Context('request-id-1', 50_000, 'function-arn', 'trace-id-1');

$handler->handle([
'requestContext' => [
'routeKey' => 'my-test-route-key',
'eventType' => 'DISCONNECT',
'connectionId' => 'connection-id-1',
'domainName' => 'test-domain',
'apiId' => 'api-id-1',
'stage' => 'stage-test',
],
'body' => json_encode(['event' => 'disconnect']),
], $context);
});

it('handles dropped connections with HTTP_GONE', function () {
$mock = new MockHandler();

$mock->append(function (CommandInterface $cmd, RequestInterface $req) {
$mock = Mockery::mock(SymfonyResponse::class, function ($mock) {
$mock->shouldReceive('getStatusCode')
->andReturn(SymfonyResponse::HTTP_GONE);
});
return new ApiGatewayManagementApiException('', $cmd, [
'response' => $mock
]);
});

/** @var SubscriptionRepository */
$subscriptionRepository = Mockery::mock(SubscriptionRepository::class, function ($mock) {
/** @var Mock $mock */
$mock->shouldReceive('clearConnection')->withArgs(function (string $connectionId): bool {
return $connectionId === 'dropped-connection-id-1234';
})->once();
});

$config = config('laravel-echo-api-gateway');

/** @var ConnectionRepository */
$connectionRepository = new ConnectionRepository($subscriptionRepository, array_merge_recursive(['connection' => ['handler' => $mock]], $config));

$connectionRepository->sendMessage('dropped-connection-id-1234', 'test-message');
});

it('handles dropped connections with GoneException', function () {
$mock = new MockHandler();

$mock->append(function (CommandInterface $cmd, RequestInterface $req) {
Expand Down