3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
21 * Classes for reading and writing Avro data to AvroIO objects.
25 * @todo Implement JSON encoding, as is required by the Avro spec.
29 * Exceptions arising from writing or reading Avro data.
33 class AvroIOTypeException extends AvroException
36 * @param AvroSchema $expected_schema
39 public function __construct($expected_schema, $datum)
41 parent::__construct(sprintf('The datum %s is not an example of schema %s',
42 var_export($datum, true), $expected_schema));
47 * Exceptions arising from incompatibility between
48 * reader and writer schemas.
52 class AvroIOSchemaMatchException extends AvroException
55 * @param AvroSchema $writers_schema
56 * @param AvroSchema $readers_schema
58 function __construct($writers_schema, $readers_schema)
61 sprintf("Writer's schema %s and Reader's schema %s do not match.",
62 $writers_schema, $readers_schema));
67 * Handles schema-specific writing of data to the encoder.
69 * Ensures that each datum written is consistent with the writer's schema.
73 class AvroIODatumWriter
76 * Schema used by this instance to write Avro data.
79 private $writers_schema;
82 * @param AvroSchema $writers_schema
84 function __construct($writers_schema=null)
86 $this->writers_schema = $writers_schema;
90 * @param AvroSchema $writers_schema
92 * @param AvroIOBinaryEncoder $encoder
95 * @throws AvroIOTypeException if $datum is invalid for $writers_schema
97 function write_data($writers_schema, $datum, $encoder)
99 if (!AvroSchema::is_valid_datum($writers_schema, $datum))
100 throw new AvroIOTypeException($writers_schema, $datum);
102 switch ($writers_schema->type())
104 case AvroSchema::NULL_TYPE:
105 return $encoder->write_null($datum);
106 case AvroSchema::BOOLEAN_TYPE:
107 return $encoder->write_boolean($datum);
108 case AvroSchema::INT_TYPE:
109 return $encoder->write_int($datum);
110 case AvroSchema::LONG_TYPE:
111 return $encoder->write_long($datum);
112 case AvroSchema::FLOAT_TYPE:
113 return $encoder->write_float($datum);
114 case AvroSchema::DOUBLE_TYPE:
115 return $encoder->write_double($datum);
116 case AvroSchema::STRING_TYPE:
117 return $encoder->write_string($datum);
118 case AvroSchema::BYTES_TYPE:
119 return $encoder->write_bytes($datum);
120 case AvroSchema::ARRAY_SCHEMA:
121 return $this->write_array($writers_schema, $datum, $encoder);
122 case AvroSchema::MAP_SCHEMA:
123 return $this->write_map($writers_schema, $datum, $encoder);
124 case AvroSchema::FIXED_SCHEMA:
125 return $this->write_fixed($writers_schema, $datum, $encoder);
126 case AvroSchema::ENUM_SCHEMA:
127 return $this->write_enum($writers_schema, $datum, $encoder);
128 case AvroSchema::RECORD_SCHEMA:
129 case AvroSchema::ERROR_SCHEMA:
130 case AvroSchema::REQUEST_SCHEMA:
131 return $this->write_record($writers_schema, $datum, $encoder);
132 case AvroSchema::UNION_SCHEMA:
133 return $this->write_union($writers_schema, $datum, $encoder);
135 throw new AvroException(sprintf('Uknown type: %s',
136 $writers_schema->type));
142 * @param AvroIOBinaryEncoder $encoder
144 function write($datum, $encoder)
146 $this->write_data($this->writers_schema, $datum, $encoder);
150 * @param AvroSchema $writers_schema
151 * @param null|boolean|int|float|string|array $datum item to be written
152 * @param AvroIOBinaryEncoder $encoder
154 private function write_array($writers_schema, $datum, $encoder)
156 $datum_count = count($datum);
157 if (0 < $datum_count)
159 $encoder->write_long($datum_count);
160 $items = $writers_schema->items();
161 foreach ($datum as $item)
162 $this->write_data($items, $item, $encoder);
164 return $encoder->write_long(0);
167 private function write_map($writers_schema, $datum, $encoder)
169 $datum_count = count($datum);
170 if ($datum_count > 0)
172 $encoder->write_long($datum_count);
173 foreach ($datum as $k => $v)
175 $encoder->write_string($k);
176 $this->write_data($writers_schema->values(), $v, $encoder);
179 $encoder->write_long(0);
182 private function write_union($writers_schema, $datum, $encoder)
184 $datum_schema_index = -1;
185 $datum_schema = null;
186 foreach ($writers_schema->schemas() as $index => $schema)
187 if (AvroSchema::is_valid_datum($schema, $datum))
189 $datum_schema_index = $index;
190 $datum_schema = $schema;
194 if (is_null($datum_schema))
195 throw new AvroIOTypeException($writers_schema, $datum);
197 $encoder->write_long($datum_schema_index);
198 $this->write_data($datum_schema, $datum, $encoder);
201 private function write_enum($writers_schema, $datum, $encoder)
203 $datum_index = $writers_schema->symbol_index($datum);
204 return $encoder->write_int($datum_index);
207 private function write_fixed($writers_schema, $datum, $encoder)
210 * NOTE Unused $writers_schema parameter included for consistency
211 * with other write_* methods.
213 return $encoder->write($datum);
216 private function write_record($writers_schema, $datum, $encoder)
218 foreach ($writers_schema->fields() as $field)
219 $this->write_data($field->type(), $datum[$field->name()], $encoder);
226 * Encodes and writes Avro data to an AvroIO object using
227 * Avro binary encoding.
231 class AvroIOBinaryEncoder
234 * Performs encoding of the given float value to a binary string
236 * XXX: This is <b>not</b> endian-aware! The {@link Avro::check_platform()}
237 * called in {@link AvroIOBinaryEncoder::__construct()} should ensure the
238 * library is only used on little-endian platforms, which ensure the little-endian
239 * encoding required by the Avro spec.
241 * @param float $float
242 * @returns string bytes
243 * @see Avro::check_platform()
245 static function float_to_int_bits($float)
247 return pack('f', (float) $float);
251 * Performs encoding of the given double value to a binary string
253 * XXX: This is <b>not</b> endian-aware! See comments in
254 * {@link AvroIOBinaryEncoder::float_to_int_bits()} for details.
256 * @param double $double
257 * @returns string bytes
259 static function double_to_long_bits($double)
261 return pack('d', (double) $double);
265 * @param int|string $n
266 * @returns string long $n encoded as bytes
267 * @internal This relies on 64-bit PHP.
269 static public function encode_long($n)
272 $n = ($n << 1) ^ ($n >> 63);
274 while (0 != ($n & ~0x7F))
276 $str .= chr(($n & 0x7F) | 0x80);
289 * @param AvroIO $io object to which data is to be written.
292 function __construct($io)
294 Avro::check_platform();
299 * @param null $datum actual value is ignored
301 function write_null($datum) { return null; }
304 * @param boolean $datum
306 function write_boolean($datum)
308 $byte = $datum ? chr(1) : chr(0);
315 function write_int($datum) { $this->write_long($datum); }
320 function write_long($n)
322 if (Avro::uses_gmp())
323 $this->write(AvroGMP::encode_long($n));
325 $this->write(self::encode_long($n));
329 * @param float $datum
330 * @uses self::float_to_int_bits()
332 public function write_float($datum)
334 $this->write(self::float_to_int_bits($datum));
338 * @param float $datum
339 * @uses self::double_to_long_bits()
341 public function write_double($datum)
343 $this->write(self::double_to_long_bits($datum));
348 * @uses self::write_bytes()
350 function write_string($str) { $this->write_bytes($str); }
353 * @param string $bytes
355 function write_bytes($bytes)
357 $this->write_long(strlen($bytes));
358 $this->write($bytes);
362 * @param string $datum
364 function write($datum) { $this->io->write($datum); }
368 * Handles schema-specifc reading of data from the decoder.
370 * Also handles schema resolution between the reader and writer
371 * schemas (if a writer's schema is provided).
375 class AvroIODatumReader
379 * @param AvroSchema $writers_schema
380 * @param AvroSchema $readers_schema
381 * @returns boolean true if the schemas are consistent with
382 * each other and false otherwise.
384 static function schemas_match($writers_schema, $readers_schema)
386 $writers_schema_type = $writers_schema->type;
387 $readers_schema_type = $readers_schema->type;
389 if (AvroSchema::UNION_SCHEMA == $writers_schema_type
390 || AvroSchema::UNION_SCHEMA == $readers_schema_type)
393 if ($writers_schema_type == $readers_schema_type)
395 if (AvroSchema::is_primitive_type($writers_schema_type))
398 switch ($readers_schema_type)
400 case AvroSchema::MAP_SCHEMA:
401 return self::attributes_match($writers_schema->values(),
402 $readers_schema->values(),
403 array(AvroSchema::TYPE_ATTR));
404 case AvroSchema::ARRAY_SCHEMA:
405 return self::attributes_match($writers_schema->items(),
406 $readers_schema->items(),
407 array(AvroSchema::TYPE_ATTR));
408 case AvroSchema::ENUM_SCHEMA:
409 return self::attributes_match($writers_schema, $readers_schema,
410 array(AvroSchema::FULLNAME_ATTR));
411 case AvroSchema::FIXED_SCHEMA:
412 return self::attributes_match($writers_schema, $readers_schema,
413 array(AvroSchema::FULLNAME_ATTR,
414 AvroSchema::SIZE_ATTR));
415 case AvroSchema::RECORD_SCHEMA:
416 case AvroSchema::ERROR_SCHEMA:
417 return self::attributes_match($writers_schema, $readers_schema,
418 array(AvroSchema::FULLNAME_ATTR));
419 case AvroSchema::REQUEST_SCHEMA:
420 // XXX: This seems wrong
425 if (AvroSchema::INT_TYPE == $writers_schema_type
426 && in_array($readers_schema_type, array(AvroSchema::LONG_TYPE,
427 AvroSchema::FLOAT_TYPE,
428 AvroSchema::DOUBLE_TYPE)))
431 if (AvroSchema::LONG_TYPE == $writers_schema_type
432 && in_array($readers_schema_type, array(AvroSchema::FLOAT_TYPE,
433 AvroSchema::DOUBLE_TYPE)))
436 if (AvroSchema::FLOAT_TYPE == $writers_schema_type
437 && AvroSchema::DOUBLE_TYPE == $readers_schema_type)
446 * Checks equivalence of the given attributes of the two given schemas.
448 * @param AvroSchema $schema_one
449 * @param AvroSchema $schema_two
450 * @param string[] $attribute_names array of string attribute names to compare
452 * @returns boolean true if the attributes match and false otherwise.
454 static function attributes_match($schema_one, $schema_two, $attribute_names)
456 foreach ($attribute_names as $attribute_name)
457 if ($schema_one->attribute($attribute_name)
458 != $schema_two->attribute($attribute_name))
466 private $writers_schema;
471 private $readers_schema;
474 * @param AvroSchema $writers_schema
475 * @param AvroSchema $readers_schema
477 function __construct($writers_schema=null, $readers_schema=null)
479 $this->writers_schema = $writers_schema;
480 $this->readers_schema = $readers_schema;
484 * @param AvroSchema $readers_schema
486 public function set_writers_schema($readers_schema)
488 $this->writers_schema = $readers_schema;
492 * @param AvroIOBinaryDecoder $decoder
495 public function read($decoder)
497 if (is_null($this->readers_schema))
498 $this->readers_schema = $this->writers_schema;
499 return $this->read_data($this->writers_schema, $this->readers_schema,
504 * @param AvroSchema $writers_schema
505 * @param AvroSchema $readers_schema
506 * @param AvroIOBinaryDecoder $decoder
511 public function read_data($writers_schema, $readers_schema, $decoder)
513 if (!self::schemas_match($writers_schema, $readers_schema))
514 throw new AvroIOSchemaMatchException($writers_schema, $readers_schema);
516 // Schema resolution: reader's schema is a union, writer's schema is not
517 if (AvroSchema::UNION_SCHEMA == $readers_schema->type()
518 && AvroSchema::UNION_SCHEMA != $writers_schema->type())
520 foreach ($readers_schema->schemas() as $schema)
521 if (self::schemas_match($writers_schema, $schema))
522 return $this->read_data($writers_schema, $schema, $decoder);
523 throw new AvroIOSchemaMatchException($writers_schema, $readers_schema);
526 switch ($writers_schema->type())
528 case AvroSchema::NULL_TYPE:
529 return $decoder->read_null();
530 case AvroSchema::BOOLEAN_TYPE:
531 return $decoder->read_boolean();
532 case AvroSchema::INT_TYPE:
533 return $decoder->read_int();
534 case AvroSchema::LONG_TYPE:
535 return $decoder->read_long();
536 case AvroSchema::FLOAT_TYPE:
537 return $decoder->read_float();
538 case AvroSchema::DOUBLE_TYPE:
539 return $decoder->read_double();
540 case AvroSchema::STRING_TYPE:
541 return $decoder->read_string();
542 case AvroSchema::BYTES_TYPE:
543 return $decoder->read_bytes();
544 case AvroSchema::ARRAY_SCHEMA:
545 return $this->read_array($writers_schema, $readers_schema, $decoder);
546 case AvroSchema::MAP_SCHEMA:
547 return $this->read_map($writers_schema, $readers_schema, $decoder);
548 case AvroSchema::UNION_SCHEMA:
549 return $this->read_union($writers_schema, $readers_schema, $decoder);
550 case AvroSchema::ENUM_SCHEMA:
551 return $this->read_enum($writers_schema, $readers_schema, $decoder);
552 case AvroSchema::FIXED_SCHEMA:
553 return $this->read_fixed($writers_schema, $readers_schema, $decoder);
554 case AvroSchema::RECORD_SCHEMA:
555 case AvroSchema::ERROR_SCHEMA:
556 case AvroSchema::REQUEST_SCHEMA:
557 return $this->read_record($writers_schema, $readers_schema, $decoder);
559 throw new AvroException(sprintf("Cannot read unknown schema type: %s",
560 $writers_schema->type()));
567 public function read_array($writers_schema, $readers_schema, $decoder)
570 $block_count = $decoder->read_long();
571 while (0 != $block_count)
573 if ($block_count < 0)
575 $block_count = -$block_count;
576 $block_size = $decoder->read_long(); // Read (and ignore) block size
578 for ($i = 0; $i < $block_count; $i++)
579 $items []= $this->read_data($writers_schema->items(),
580 $readers_schema->items(),
582 $block_count = $decoder->read_long();
590 public function read_map($writers_schema, $readers_schema, $decoder)
593 $pair_count = $decoder->read_long();
594 while (0 != $pair_count)
598 $pair_count = -$pair_count;
599 // Note: we're not doing anything with block_size other than skipping it
600 $block_size = $decoder->read_long();
603 for ($i = 0; $i < $pair_count; $i++)
605 $key = $decoder->read_string();
606 $items[$key] = $this->read_data($writers_schema->values(),
607 $readers_schema->values(),
610 $pair_count = $decoder->read_long();
618 public function read_union($writers_schema, $readers_schema, $decoder)
620 $schema_index = $decoder->read_long();
621 $selected_writers_schema = $writers_schema->schema_by_index($schema_index);
622 return $this->read_data($selected_writers_schema, $readers_schema, $decoder);
628 public function read_enum($writers_schema, $readers_schema, $decoder)
630 $symbol_index = $decoder->read_int();
631 $symbol = $writers_schema->symbol_by_index($symbol_index);
632 if (!$readers_schema->has_symbol($symbol))
633 null; // FIXME: unset wrt schema resolution
640 public function read_fixed($writers_schema, $readers_schema, $decoder)
642 return $decoder->read($writers_schema->size());
648 public function read_record($writers_schema, $readers_schema, $decoder)
650 $readers_fields = $readers_schema->fields_hash();
652 foreach ($writers_schema->fields() as $writers_field)
654 $type = $writers_field->type();
655 if (isset($readers_fields[$writers_field->name()]))
656 $record[$writers_field->name()]
657 = $this->read_data($type,
658 $readers_fields[$writers_field->name()]->type(),
661 $this->skip_data($type, $decoder);
663 // Fill in default values
664 if (count($readers_fields) > count($record))
666 $writers_fields = $writers_schema->fields_hash();
667 foreach ($readers_fields as $field_name => $field)
669 if (!isset($writers_fields[$field_name]))
671 if ($field->has_default_value())
672 $record[$field->name()]
673 = $this->read_default_value($field->type(),
674 $field->default_value());
676 null; // FIXME: unset
686 * @param AvroSchema $field_schema
687 * @param null|boolean|int|float|string|array $default_value
688 * @returns null|boolean|int|float|string|array
690 * @throws AvroException if $field_schema type is unknown.
692 public function read_default_value($field_schema, $default_value)
694 switch($field_schema->type())
696 case AvroSchema::NULL_TYPE:
698 case AvroSchema::BOOLEAN_TYPE:
699 return $default_value;
700 case AvroSchema::INT_TYPE:
701 case AvroSchema::LONG_TYPE:
702 return (int) $default_value;
703 case AvroSchema::FLOAT_TYPE:
704 case AvroSchema::DOUBLE_TYPE:
705 return (float) $default_value;
706 case AvroSchema::STRING_TYPE:
707 case AvroSchema::BYTES_TYPE:
708 return $default_value;
709 case AvroSchema::ARRAY_SCHEMA:
711 foreach ($default_value as $json_val)
713 $val = $this->read_default_value($field_schema->items(), $json_val);
717 case AvroSchema::MAP_SCHEMA:
719 foreach ($default_value as $key => $json_val)
720 $map[$key] = $this->read_default_value($field_schema->values(),
723 case AvroSchema::UNION_SCHEMA:
724 return $this->read_default_value($field_schema->schema_by_index(0),
726 case AvroSchema::ENUM_SCHEMA:
727 case AvroSchema::FIXED_SCHEMA:
728 return $default_value;
729 case AvroSchema::RECORD_SCHEMA:
731 foreach ($field_schema->fields() as $field)
733 $field_name = $field->name();
734 if (!$json_val = $default_value[$field_name])
735 $json_val = $field->default_value();
737 $record[$field_name] = $this->read_default_value($field->type(),
742 throw new AvroException(sprintf('Unknown type: %s', $field_schema->type()));
747 * @param AvroSchema $writers_schema
748 * @param AvroIOBinaryDecoder $decoder
750 private function skip_data($writers_schema, $decoder)
752 switch ($writers_schema->type())
754 case AvroSchema::NULL_TYPE:
755 return $decoder->skip_null();
756 case AvroSchema::BOOLEAN_TYPE:
757 return $decoder->skip_boolean();
758 case AvroSchema::INT_TYPE:
759 return $decoder->skip_int();
760 case AvroSchema::LONG_TYPE:
761 return $decoder->skip_long();
762 case AvroSchema::FLOAT_TYPE:
763 return $decoder->skip_float();
764 case AvroSchema::DOUBLE_TYPE:
765 return $decoder->skip_double();
766 case AvroSchema::STRING_TYPE:
767 return $decoder->skip_string();
768 case AvroSchema::BYTES_TYPE:
769 return $decoder->skip_bytes();
770 case AvroSchema::ARRAY_SCHEMA:
771 return $decoder->skip_array($writers_schema, $decoder);
772 case AvroSchema::MAP_SCHEMA:
773 return $decoder->skip_map($writers_schema, $decoder);
774 case AvroSchema::UNION_SCHEMA:
775 return $decoder->skip_union($writers_schema, $decoder);
776 case AvroSchema::ENUM_SCHEMA:
777 return $decoder->skip_enum($writers_schema, $decoder);
778 case AvroSchema::FIXED_SCHEMA:
779 return $decoder->skip_fixed($writers_schema, $decoder);
780 case AvroSchema::RECORD_SCHEMA:
781 case AvroSchema::ERROR_SCHEMA:
782 case AvroSchema::REQUEST_SCHEMA:
783 return $decoder->skip_record($writers_schema, $decoder);
785 throw new AvroException(sprintf('Uknown schema type: %s',
786 $writers_schema->type()));
792 * Decodes and reads Avro data from an AvroIO object encoded using
793 * Avro binary encoding.
797 class AvroIOBinaryDecoder
801 * @param int[] array of byte ascii values
802 * @returns long decoded value
803 * @internal Requires 64-bit platform
805 public static function decode_long_from_array($bytes)
807 $b = array_shift($bytes);
810 while (0 != ($b & 0x80))
812 $b = array_shift($bytes);
813 $n |= (($b & 0x7f) << $shift);
816 return (($n >> 1) ^ -($n & 1));
820 * Performs decoding of the binary string to a float value.
822 * XXX: This is <b>not</b> endian-aware! See comments in
823 * {@link AvroIOBinaryEncoder::float_to_int_bits()} for details.
825 * @param string $bits
828 static public function int_bits_to_float($bits)
830 $float = unpack('f', $bits);
831 return (float) $float[1];
835 * Performs decoding of the binary string to a double value.
837 * XXX: This is <b>not</b> endian-aware! See comments in
838 * {@link AvroIOBinaryEncoder::float_to_int_bits()} for details.
840 * @param string $bits
843 static public function long_bits_to_double($bits)
845 $double = unpack('d', $bits);
846 return (double) $double[1];
855 * @param AvroIO $io object from which to read.
857 public function __construct($io)
859 Avro::check_platform();
864 * @returns string the next byte from $this->io.
865 * @throws AvroException if the next byte cannot be read.
867 private function next_byte() { return $this->read(1); }
872 public function read_null() { return null; }
877 public function read_boolean()
879 return (boolean) (1 == ord($this->next_byte()));
885 public function read_int() { return (int) $this->read_long(); }
890 public function read_long()
892 $byte = ord($this->next_byte());
893 $bytes = array($byte);
894 while (0 != ($byte & 0x80))
896 $byte = ord($this->next_byte());
900 if (Avro::uses_gmp())
901 return AvroGMP::decode_long_from_array($bytes);
903 return self::decode_long_from_array($bytes);
909 public function read_float()
911 return self::int_bits_to_float($this->read(4));
917 public function read_double()
919 return self::long_bits_to_double($this->read(8));
923 * A string is encoded as a long followed by that many bytes
924 * of UTF-8 encoded character data.
927 public function read_string() { return $this->read_bytes(); }
932 public function read_bytes() { return $this->read($this->read_long()); }
935 * @param int $len count of bytes to read
938 public function read($len) { return $this->io->read($len); }
940 public function skip_null() { return null; }
942 public function skip_boolean() { return $this->skip(1); }
944 public function skip_int() { return $this->skip_long(); }
946 protected function skip_long()
948 $b = $this->next_byte();
949 while (0 != ($b & 0x80))
950 $b = $this->next_byte();
953 public function skip_float() { return $this->skip(4); }
955 public function skip_double() { return $this->skip(8); }
957 public function skip_bytes() { return $this->skip($this->read_long()); }
959 public function skip_string() { return $this->skip_bytes(); }
962 * @param int $len count of bytes to skip
963 * @uses AvroIO::seek()
965 public function skip($len) { $this->seek($len, AvroIO::SEEK_CUR); }
968 * @returns int position of pointer in AvroIO instance
969 * @uses AvroIO::tell()
971 private function tell() { return $this->io->tell(); }
976 * @returns boolean true upon success
977 * @uses AvroIO::seek()
979 private function seek($offset, $whence)
981 return $this->io->seek($offset, $whence);