-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathframe_streamer.py
207 lines (175 loc) · 7.07 KB
/
frame_streamer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
###############################################################################
#
# Copyright (c) 2017-2020 Master AI, Inc.
# ALL RIGHTS RESERVED
#
# Use of this library, in source or binary form, is prohibited without written
# approval from Master AI, Inc.
#
###############################################################################
"""
This module provides helper functions for plotting and streaming camera
frames, or for plotting/streaming any RGB or grey image buffer.
This is a **synchronous** interface.
"""
from auto import console
from auto import _ctx_print_all
from auto.labs import send_message_to_labs
from auto.camera import base64_encode_image
import cv2
import numpy as np
import PIL.Image
OPTIMAL_ASPECT_RATIO = 4/3
def plot(frames, also_stream=True, verbose=False):
"""
Stitch together the given `frames` (a numpy nd-array) into a single nd-array.
If running in a notebook then the PIL image will be returned (and displayed).
This function by default also streams the image to your `labs` account.
The `frames` parameter must be a numpy ndarray with one of the
following shapes:
- (n, h, w, 3) meaning `n` 3-channel RGB images of size `w`x`h`
- (n, h, w, 1) meaning `n` 1-channel gray images of size `w`x`h`
- (h, w, 3) meaning a single 3-channel RGB image of size `w`x`h`
- (h, w, 1) meaning a single 1-channel gray image of size `w`x`h`
- (h, w) meaning a single 1-channel gray image of size `w`x`h`
"""
# Ensure the proper shape of `frames`.
if frames.ndim == 4:
pass
elif frames.ndim == 3:
frames = np.expand_dims(frames, axis=0)
elif frames.ndim == 2:
frames = np.expand_dims(frames, axis=2)
frames = np.expand_dims(frames, axis=0)
else:
raise Exception("invalid frames ndarray ndim")
if frames.shape[3] != 3 and frames.shape[3] != 1:
raise Exception("invalid number of channels")
if verbose:
n = frames.shape[0]
_ctx_print_all("Plotting {} frame{}...".format(n, 's' if n != 1 else ''))
montage = _create_montage(frames)
if also_stream:
stream(montage, to_labs=True, verbose=False)
return PIL.Image.fromarray(np.squeeze(montage)) if _in_notebook() else None
def _in_notebook():
"""
Determine if the current process is running in a jupyter notebook / iPython shell
Returns: boolean
"""
try:
shell = get_ipython().__class__.__name__
if shell == 'ZMQInteractiveShell':
result = True # jupyter notebook
elif shell == 'TerminalInteractiveShell':
result = True # iPython via terminal
else:
result = False # unknown shell type
except NameError:
result = False # most likely standard Python interpreter
return result
def _create_montage(frames):
"""
Stitch together all frames into 1 montage image:
Each frame shape is (height x width x channels).
Only supports 1 to 4 frames:
frame_count | result
1 | 1 row x 1 col
2 | 1 row x 2 col
3 | 2 row x 2 col (4th frame all white)
4 | 2 row x 2 col
Args:
frames: nd-array or list of nd-arrays
Returns: nd-array of shape (row_count * frame_height, column_count * frame_width, channels)
"""
n = frames.shape[0]
if n > 4:
raise NotImplementedError("currently you may only montage up to 4 frames")
MAX_COLS = 2
frames = np.array(frames)
if n == 1:
montage = frames[0]
else:
frames = [_shrink_img(frame, factor=1/MAX_COLS) for frame in frames]
rows_of_frames = [frames[i:i+MAX_COLS] for i in range(0, len(frames), MAX_COLS)]
rows_of_combined_frames = [(np.hstack(row) if len(row) == MAX_COLS # stitch frames together
else np.hstack([row[0], np.full_like(row[0],255)]) # stitch frame with white frame
) for row in rows_of_frames]
montage = np.vstack(rows_of_combined_frames)
return montage
def _shrink_img(img, factor=0.5):
"""
Reduce the img dimensions to factor*100 percent.
Args:
img: nd-array with shape (height, width, channels)
Returns: nd-array with shape (height*factor, width*factor, channels)
"""
shrunk_image = cv2.resize(img,None,fx=factor,fy=factor,interpolation=cv2.INTER_AREA)
return shrunk_image
def stream(frame, to_console=True, to_labs=False, verbose=False):
"""
Stream the given `frame` (a numpy ndarray) to your device's
console _and_ (optionally) to your `labs` account to be shown
in your browser.
The `frame` parameter must be a numpy ndarray with one of the
following shapes:
- (h, w, 3) meaning a single 3-channel RGB image of size `w`x`h`
- (h, w, 1) meaning a single 1-channel gray image of size `w`x`h`
- (h, w) meaning a single 1-channel gray image of size `w`x`h`
"""
if frame is None:
if to_console:
console.clear_image()
if to_labs:
send_message_to_labs({'base64_img': ''})
return
# Publish the uncompressed frame to the console UI.
if to_console:
if frame.ndim == 3:
if frame.shape[2] == 3:
pass # all good
elif frame.shape[2] == 1:
pass # all good
else:
raise Exception("invalid number of channels")
elif frame.ndim == 2:
frame = np.expand_dims(frame, axis=2)
assert frame.ndim == 3 and frame.shape[2] == 1
else:
raise Exception(f"invalid frame ndarray ndim: {frame.ndim}")
height, width, channels = frame.shape
aspect_ratio = width / height
if aspect_ratio != OPTIMAL_ASPECT_RATIO:
final_frame = _add_white_bars(frame)
height, width, channels = final_frame.shape
else:
final_frame = frame
shape = [width, height, channels]
rect = [0, 0, 0, 0]
console.stream_image(rect, shape, final_frame.tobytes())
# Encode the frame and publish to the network connection.
if to_labs:
base64_img = base64_encode_image(frame)
send_message_to_labs({'base64_img': base64_img})
if verbose:
h, w = frame.shape[:2]
_ctx_print_all("Streamed frame of size {}x{}.".format(w, h))
def _add_white_bars(frame):
"""
This function is intended for a wide image that needs white bars
on top and bottom so as to not be stretched when displayed.
Args:
frame: nd-array (height, width, channels)
Returns: nd-array (height, width, channels) with the OPTIMAL_ASPECT_RATIO
"""
height, width, channels = frame.shape
aspect_ratio = width / height
if aspect_ratio > OPTIMAL_ASPECT_RATIO:
# add horizontal bars
bar_height = int(((width / OPTIMAL_ASPECT_RATIO) - height )/ 2)
bar = np.ones((bar_height, width, channels), dtype=frame.dtype) * 255
frame = np.vstack((bar, frame, bar))
else:
# add vertical bars
pass
return frame