Source code for zoo.pipeline.api.keras.layers.self_attention
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import numpy as np
import math
from bigdl.nn.layer import Sum
from bigdl.nn.layer import Layer
from zoo.common.utils import callZooFunc
from zoo.models.common import ZooModel
from zoo.pipeline.api.keras.engine import ZooKerasLayer
from zoo.pipeline.api.keras.layers import *
from zoo.pipeline.api.keras.models import Sequential
from zoo.pipeline.api.keras.models import Model
import zoo.pipeline.api.autograd as auto
if sys.version >= '3':
long = int
unicode = str
[docs]def layer_norm(x, w, b, e=1e-5):
sizes = x.get_output_shape()[1:]
u = auto.mean(x, len(sizes), True)
s = auto.mean(auto.square(x - u), len(sizes), True)
y = (x - u) / auto.sqrt(s + e)
y = y * w + b
return y
[docs]class TransformerLayer(ZooKerasLayer):
"""
A self attention layer
Input is a list which consists of 2 ndarrays.
1. Token id ndarray: shape [batch, seqLen] with the word token indices in the vocabulary
2. Position id ndarray: shape [batch, seqLen] with positions in the sentence.
Output is a list which contains:
1. The states of Transformer layer.
2. The pooled output which processes the hidden state of the last layer with regard to the first
token of the sequence. This would be useful for segment-level tasks.
# Arguments
nBlock: block number
hidden_drop: drop probability off projection
attn_drop: drop probability of attention
n_head: head number
initializer_range: weight initialization range
bidirectional: whether unidirectional or bidirectional
output_all_block: whether output all blocks' output
embedding_layer: embedding layer
input_shape: input shape
"""
def __init__(self, n_block, hidden_drop, attn_drop, n_head, initializer_range, bidirectional,
output_all_block, embedding_layer, input_shape, intermediate_size=0,
bigdl_type="float"):
self.hidden_drop = hidden_drop
self.attn_drop = attn_drop
self.n_head = n_head
self.initializer_range = initializer_range
self.output_all_block = output_all_block
self.bidirectional = bidirectional
self.intermediate_size = intermediate_size
self.seq_len = input_shape[0][0]
self.bigdl_type = bigdl_type
if not bidirectional:
mask_value = np.tril(np.ones((self.seq_len, self.seq_len), dtype=bigdl_type))
self.mask_value = auto.Constant(data=mask_value.reshape((1, 1,
self.seq_len, self.seq_len)))
(extended_attention_mask, embedding_inputs, inputs) = self.build_input(input_shape)
embedding = embedding_layer(embedding_inputs)
hidden_size = embedding.get_output_shape()[-1]
next_input = embedding
output = [None] * n_block
output[0] = self.block(next_input, hidden_size, extended_attention_mask)
for index in range(n_block - 1):
o = self.block(output[index], hidden_size, extended_attention_mask)
output[index + 1] = o
pooler_output = self.pooler(output[-1], hidden_size)
model = Model(inputs, output.append(pooler_output)) if output_all_block \
else Model(inputs, [output[-1], pooler_output])
self.value = model.value
[docs] def build_input(self, input_shape):
if any(not isinstance(i, tuple) and not isinstance(i, list) for i in input_shape):
raise TypeError('TransformerLayer input must be a list of ndarray (consisting'
' of input sequence, sequence positions, etc.)')
inputs = [Input(list(shape)) for shape in input_shape]
return None, inputs, inputs
[docs] def block(self, x, size, attention_mask=None, eplision=1e-5):
g = auto.Parameter(shape=(1, size), init_weight=np.ones((1, size), dtype=self.bigdl_type))
b = auto.Parameter(shape=(1, size), init_weight=np.zeros((1, size), dtype=self.bigdl_type))
g2 = auto.Parameter(shape=(1, size), init_weight=np.ones((1, size), dtype=self.bigdl_type))
b2 = auto.Parameter(shape=(1, size), init_weight=np.zeros((1, size), dtype=self.bigdl_type))
a = self.multi_head_self_attention(x, size, attention_mask)
n = layer_norm(x + a, w=g, b=b, e=eplision)
m = self.mlp(n, size)
h = layer_norm(n + m, w=g2, b=b2, e=eplision)
return h
[docs] def projection_layer(self, output_size):
return Convolution1D(output_size, 1, "normal", (0.0, self.initializer_range))
[docs] def multi_head_self_attention(self, x, size, attention_mask=None):
c = self.projection_layer(size * 3)(x)
query = c.slice(2, 0, size)
key = c.slice(2, size, size)
value = c.slice(2, size * 2, size)
q = self.split_heads(query, self.n_head)
k = self.split_heads(key, self.n_head, k=True)
v = self.split_heads(value, self.n_head)
a = self.attn(q, k, v, True, attention_mask)
m = self.merge_heads(a)
n = self.projection_layer(size)(m)
d = Dropout(self.hidden_drop)(n)
return d
[docs] def attn(self, q, k, v, scale=False, attention_mask=None):
w = auto.mm(q, k)
if scale:
w = w / math.sqrt(v.get_output_shape()[-1])
if not self.bidirectional:
w = w * self.mask_value + (self.mask_value * (-1.0) + 1.0) * (-1e9)
if attention_mask:
w = w + attention_mask
w = Activation("softmax")(w)
w = Dropout(self.attn_drop)(w)
w = auto.mm(w, v)
return w
[docs] def mlp(self, x, hidden_size):
size = self.intermediate_size if self.intermediate_size > 0 else hidden_size * 4
h = self.projection_layer(size)(x)
a = self.gelu(h)
h2 = self.projection_layer(hidden_size)(a)
y = Dropout(self.hidden_drop)(h2)
return y
[docs] def gelu(self, x):
y = (auto.square(x) * x * 0.044715 + x) * (math.sqrt(2 / math.pi))
y = Activation("tanh")(y) + 1.0
y = x * 0.5 * y
return y
[docs] def split_heads(self, x, n_head, k=False):
sizes = x.get_output_shape()[1:]
shape = list(sizes + (int(sizes[-1] / n_head),))
shape[-2] = n_head
r = Reshape(shape)(x)
if k:
f = Permute((2, 3, 1))(r)
else:
f = Permute((2, 1, 3))(r)
return f
[docs] def merge_heads(self, x):
p = auto.contiguous(Permute((2, 1, 3))(x))
sizes = p.get_output_shape()[1:]
merge_sizes = list(sizes[:-2] + (sizes[-1] * sizes[-2],))
m = Reshape(merge_sizes)(p)
return m
[docs] def pooler(self, x, hidden_size):
first_token = Select(1, 0)(x)
pooler_output = Dense(hidden_size)(first_token)
o = Activation("tanh")(pooler_output)
return o
[docs] @classmethod
def init(cls, vocab=40990, seq_len=77, n_block=12, hidden_drop=0.1,
attn_drop=0.1, n_head=12, hidden_size=768,
embedding_drop=0.1, initializer_range=0.02,
bidirectional=False, output_all_block=False):
"""
vocab: vocabulary size of training data, default is 40990
seq_len: max sequence length of training data, default is 77
n_block: block number, default is 12
hidden_drop: drop probability of projection, default is 0.1
attn_drop: drop probability of attention, default is 0.1
n_head: head number, default is 12
hidden_size: is also embedding size
embedding_drop: drop probability of embedding layer, default is 0.1
initializer_range: weight initialization range, default is 0.02
bidirectional: whether unidirectional or bidirectional, default is unidirectional
output_all_block: whether output all blocks' output
"""
if hidden_size < 0:
raise TypeError('hidden_size must be greater than 0 with default embedding layer')
from bigdl.nn.layer import Squeeze
word_input = InputLayer(input_shape=(seq_len,))
postion_input = InputLayer(input_shape=(seq_len,))
embedding = Sequential()
embedding.add(Merge(layers=[word_input, postion_input], mode='concat')) \
.add(Reshape([seq_len * 2])) \
.add(Embedding(vocab, hidden_size, input_length=seq_len * 2,
weights=np.random.normal(0.0, initializer_range, (vocab, hidden_size))))\
.add(Dropout(embedding_drop)) \
.add(Reshape((seq_len, 2, hidden_size))) \
.add(KerasLayerWrapper(Sum(dimension=3, squeeze=True)))
# walk around for bug #1208, need remove this line after the bug fixed
embedding.add(KerasLayerWrapper(Squeeze(dim=3)))
shape = ((seq_len,), (seq_len,))
return TransformerLayer(n_block, hidden_drop, attn_drop, n_head, initializer_range,
bidirectional, output_all_block, embedding, input_shape=shape)
[docs]class BERT(TransformerLayer):
"""
A self attention layer.
Input is a list which consists of 4 ndarrays.
1. Token id ndarray: shape [batch, seqLen] with the word token indices in the vocabulary
2. Token type id ndarray: shape [batch, seqLen] with the token types in [0, 1].
0 means `sentence A` and 1 means a `sentence B` (see BERT paper for more details).
3. Position id ndarray: shape [batch, seqLen] with positions in the sentence.
4. Attention_mask ndarray: shape [batch, seqLen] with indices in [0, 1].
It's a mask to be used if the input sequence length is smaller than seqLen in
the current batch.
Output is a list which contains:
1. The states of BERT layer.
2. The pooled output which processes the hidden state of the last layer with regard to the first
token of the sequence. This would be useful for segment-level tasks.
# Arguments
n_block: block number
n_head: head number
intermediate_size: The size of the "intermediate" (i.e., feed-forward)
hidden_drop: The dropout probability for all fully connected layers
attn_drop: drop probability of attention
initializer_ranger: weight initialization range
output_all_block: whether output all blocks' output
embedding_layer: embedding layer
input_shape: input shape
"""
def __init__(self, n_block, n_head, intermediate_size, hidden_drop, attn_drop,
initializer_range, output_all_block, embedding_layer,
input_shape, bigdl_type="float"):
self.hidden_drop = hidden_drop
self.attn_drop = attn_drop
self.n_head = n_head
self.intermediate_size = intermediate_size
self.output_all_block = output_all_block
self.bigdl_type = bigdl_type
self.seq_len = input_shape[0][0]
self.initializer_range = initializer_range
self.bidirectional = True
self.n_block = n_block
word_input = Input(shape=input_shape[0])
token_type_input = Input(shape=input_shape[1])
position_input = Input(shape=input_shape[2])
attention_mask = Input(shape=input_shape[3])
e = embedding_layer([word_input, token_type_input, position_input])
self.hidden_size = e.get_output_shape()[-1]
extended_attention_mask = (- attention_mask + 1.0) * -10000.0
next_input = e
model_output = [None] * n_block
model_output[0] = self.block(next_input, self.hidden_size, extended_attention_mask)
for _ in range(n_block - 1):
output = self.block(model_output[_], self.hidden_size, extended_attention_mask)
model_output[_ + 1] = output
pooler_output = self.pooler(model_output[-1], self.hidden_size)
if output_all_block:
model_output.append(pooler_output)
model = Model([word_input, token_type_input, position_input, attention_mask],
model_output)
else:
model = Model([word_input, token_type_input, position_input, attention_mask],
[model_output[-1], pooler_output])
self.value = model.value
[docs] def projection_layer(self, output_size):
return Dense(output_size, "normal", (0.0, self.initializer_range))
[docs] def build_input(self, input_shape):
if any(not isinstance(i, list) and not isinstance(i, tuple) for i in input_shape) \
and len(input_shape) != 4:
raise TypeError('BERT input must be a list of 4 ndarray (consisting of input'
' sequence, sequence positions, segment id, attention mask)')
inputs = [Input(list(shape)) for shape in input_shape]
return (- inputs[-1] + 1.0) * -10000.0, inputs[:-1], inputs
[docs] @classmethod
def init(cls, vocab=40990, hidden_size=768, n_block=12, n_head=12,
seq_len=512, intermediate_size=3072, hidden_drop=0.1,
attn_drop=0.1, initializer_range=0.02, output_all_block=True,
bigdl_type="float"):
"""
vocab: vocabulary size of training data, default is 40990
hidden_size: size of the encoder layers, default is 768
n_block: block number, default is 12
n_head: head number, default is 12
seq_len: max sequence length of training data, default is 77
intermediate_size: The size of the "intermediate" (i.e., feed-forward)
hidden_drop: drop probability of full connected layers, default is 0.1
attn_drop: drop probability of attention, default is 0.1
initializer_ranger: weight initialization range, default is 0.02
output_all_block: whether output all blocks' output, default is True
"""
word_input = Input(shape=(seq_len,))
token_type_input = Input(shape=(seq_len,))
position_input = Input(shape=(seq_len,))
word_embedding = Embedding(vocab, hidden_size, input_length=seq_len,
weights=np.random.normal(0.0, initializer_range,
(vocab, hidden_size)))(word_input)
position_embedding = Embedding(seq_len, hidden_size, input_length=seq_len,
weights=np.random.normal(0.0, initializer_range,
(seq_len, hidden_size)))(
position_input)
token_type_embedding = Embedding(2, hidden_size, input_length=seq_len,
weights=np.random.normal(0.0, initializer_range,
(2, hidden_size)))(
token_type_input)
embedding = word_embedding + position_embedding + token_type_embedding
w = auto.Parameter(shape=(1, hidden_size),
init_weight=np.ones((1, hidden_size), dtype=bigdl_type))
b = auto.Parameter(shape=(1, hidden_size),
init_weight=np.zeros((1, hidden_size), dtype=bigdl_type))
after_norm = layer_norm(embedding, w, b, 1e-12)
h = Dropout(hidden_drop)(after_norm)
embedding_layer = Model([word_input, token_type_input, position_input], h)
shape = ((seq_len,), (seq_len,), (seq_len,), (1, 1, seq_len))
return BERT(n_block, n_head, intermediate_size, hidden_drop, attn_drop, initializer_range,
output_all_block, embedding_layer, input_shape=shape)
[docs] @staticmethod
def init_from_existing_model(path, weight_path=None, input_seq_len=-1.0, hidden_drop=-1.0,
attn_drop=-1.0, output_all_block=True, bigdl_type="float"):
"""
Load an existing BERT model (with weights).
# Arguments
path: The path for the pre-defined model.
Local file system, HDFS and Amazon S3 are supported.
HDFS path should be like 'hdfs://[host]:[port]/xxx'.
Amazon S3 path should be like 's3a://bucket/xxx'.
weight_path: The path for pre-trained weights if any. Default is None.
"""
jlayer = callZooFunc(bigdl_type, "loadBERT", path, weight_path, input_seq_len,
hidden_drop, attn_drop, output_all_block)
model = Layer(jvalue=jlayer, bigdl_type=bigdl_type)
model.__class__ = BERT
return model